diff --git ql/pom.xml ql/pom.xml index 06d7f27..a8e779c 100644 --- ql/pom.xml +++ ql/pom.xml @@ -61,6 +61,11 @@ hive-shims ${project.version} + + org.apache.hive + spark-client + ${project.version} + com.esotericsoftware.kryo diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java new file mode 100644 index 0000000..aa826f7 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.plan.SparkWork; + +import java.io.Serializable; + +public interface HiveSparkClient extends Serializable{ + /** + * HiveSparkClient should generate Spark RDD graph by given sparkWork and driverContext, + * and submit RDD graph to Spark cluster. + * @param driverContext + * @param sparkWork + * @return SparkJobRef could be used to track spark job progress and metrics. + * @throws Exception + */ + public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception; + + public void close(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java new file mode 100644 index 0000000..2d95b39 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.SparkException; +import scala.Tuple2; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +public class HiveSparkClientFactory { + protected static transient final Log LOG = LogFactory + .getLog(HiveSparkClientFactory.class); + + private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; + private static final String SPARK_DEFAULT_MASTER = "local"; + private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark"; + + public static HiveSparkClient createHiveSparkClient(Configuration configuration) + throws IOException, SparkException { + + Map conf = initiateSparkConf(configuration); + // Submit spark job through local spark context while spark master is local mode, otherwise submit + // spark job through remote spark context. + String master = conf.get("spark.master"); + if (master.equals("local") || master.startsWith("local[")) { + // With local spark context, all user sessions share the same spark context. + return LocalHiveSparkClient.getInstance(generateSparkConf(conf)); + } else { + return new RemoteHiveSparkClient(conf); + } + } + + private static Map initiateSparkConf(Configuration hiveConf) { + Map sparkConf = new HashMap(); + + // set default spark configurations. + sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); + sparkConf.put("spark.app.name", SAPRK_DEFAULT_APP_NAME); + sparkConf.put("spark.serializer", + "org.apache.spark.serializer.KryoSerializer"); + sparkConf.put("spark.default.parallelism", "1"); + // load properties from spark-defaults.conf. + InputStream inputStream = null; + try { + inputStream = HiveSparkClientFactory.class.getClassLoader() + .getResourceAsStream(SPARK_DEFAULT_CONF_FILE); + if (inputStream != null) { + LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE); + Properties properties = new Properties(); + properties.load(inputStream); + for (String propertyName : properties.stringPropertyNames()) { + if (propertyName.startsWith("spark")) { + String value = properties.getProperty(propertyName); + sparkConf.put(propertyName, properties.getProperty(propertyName)); + LOG.info(String.format( + "load spark configuration from %s (%s -> %s).", + SPARK_DEFAULT_CONF_FILE, propertyName, value)); + } + } + } + } catch (IOException e) { + LOG.info("Failed to open spark configuration file:" + + SPARK_DEFAULT_CONF_FILE, e); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + LOG.debug("Failed to close inputstream.", e); + } + } + } + + // load properties from hive configurations. + Iterator> iterator = hiveConf.iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + String propertyName = entry.getKey(); + if (propertyName.startsWith("spark")) { + String value = entry.getValue(); + sparkConf.put(propertyName, value); + LOG.info(String.format( + "load spark configuration from hive configuration (%s -> %s).", + propertyName, value)); + } + } + + return sparkConf; + } + + private static SparkConf generateSparkConf(Map conf) { + SparkConf sparkConf = new SparkConf(false); + for (Map.Entry entry : conf.entrySet()) { + sparkConf.set(entry.getKey(), entry.getValue()); + } + return sparkConf; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java new file mode 100644 index 0000000..c042707 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.SparkException; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.ui.jobs.JobProgressListener; +import scala.Tuple2; + +import java.io.IOException; +import java.util.*; + +/** + * LocalSparkClient submit Spark job in local driver, it's responsible for build spark client + * environment and execute spark work. + */ +public class LocalHiveSparkClient implements HiveSparkClient { + private static final long serialVersionUID = 1L; + + private static final String MR_JAR_PROPERTY = "tmpjars"; + protected static transient final Log LOG = LogFactory + .getLog(LocalHiveSparkClient.class); + + private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); + + private static LocalHiveSparkClient client; + + public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) { + if (client == null) { + client = new LocalHiveSparkClient(sparkConf); + } + return client; + } + + /** + * Get Spark shuffle memory per task, and total number of cores. This + * information can be used to estimate how many reducers a task can have. + * + * @return a tuple, the first element is the shuffle memory per task in bytes, + * the second element is the number of total cores usable by the client + */ + public static Tuple2 + getMemoryAndCores(Configuration hiveConf) { + LocalHiveSparkClient client = null; + try { + client = (LocalHiveSparkClient)HiveSparkClientFactory.createHiveSparkClient(hiveConf); + } catch (IOException e) { + e.printStackTrace(); + } catch (SparkException e) { + e.printStackTrace(); + } + SparkContext sc = client.sc.sc(); + SparkConf sparkConf = sc.conf(); + int cores = sparkConf.getInt("spark.executor.cores", 1); + double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2); + // sc.executorMemory() is in MB, need to convert to bytes + long memoryPerTask = + (long) (sc.executorMemory() * memoryFraction * 1024 * 1024 / cores); + int executors = sc.getExecutorMemoryStatus().size(); + int totalCores = executors * cores; + LOG.info("Spark cluster current has executors: " + executors + + ", cores per executor: " + cores + ", memory per executor: " + + sc.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction); + return new Tuple2(Long.valueOf(memoryPerTask), + Integer.valueOf(totalCores)); + } + + private JavaSparkContext sc; + + private List localJars = new ArrayList(); + + private List localFiles = new ArrayList(); + + private JobStateListener jobStateListener; + + private JobProgressListener jobProgressListener; + + private LocalHiveSparkClient(SparkConf sparkConf) { + sc = new JavaSparkContext(sparkConf); + jobStateListener = new JobStateListener(); + jobProgressListener = new JobProgressListener(sparkConf); + sc.sc().listenerBus().addListener(jobStateListener); + sc.sc().listenerBus().addListener(jobProgressListener); + } + + @Override + public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception { + Context ctx = driverContext.getCtx(); + HiveConf hiveConf = (HiveConf) ctx.getConf(); + refreshLocalResources(sparkWork, hiveConf); + JobConf jobConf = new JobConf(hiveConf); + + // Create temporary scratch dir + Path emptyScratchDir; + emptyScratchDir = ctx.getMRTmpPath(); + FileSystem fs = emptyScratchDir.getFileSystem(jobConf); + fs.mkdirs(emptyScratchDir); + + SparkCounters sparkCounters = new SparkCounters(sc, hiveConf); + Map> prefixes = sparkWork.getRequiredCounterPrefix(); + if (prefixes != null) { + for (String group : prefixes.keySet()) { + for (String counterName : prefixes.get(group)) { + sparkCounters.createCounter(group, counterName); + } + } + } + SparkReporter sparkReporter = new SparkReporter(sparkCounters); + + // Generate Spark plan + SparkPlanGenerator gen = + new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter); + SparkPlan plan = gen.generate(sparkWork); + + // Execute generated plan. + JavaPairRDD finalRDD = plan.generateGraph(); + // We use Spark RDD async action to submit job as it's the only way to get jobId now. + JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); + // As we always use foreach action to submit RDD graph, it would only trigger on job. + int jobId = future.jobIds().get(0); + SimpleSparkJobStatus sparkJobStatus = + new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future); + return new SparkJobRef(Integer.toString(jobId), sparkJobStatus); + } + + /** + * At this point single SparkContext is used by more than one thread, so make this + * method synchronized. + * + * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an + * issue we have to live with until multiple SparkContexts are supported in a single JVM. + */ + private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { + // add hive-exec jar + addJars((new JobConf(this.getClass())).getJar()); + + // add aux jars + addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)); + + // add added jars + String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); + addJars(addedJars); + + // add plugin module jars on demand + // jobConf will hold all the configuration for hadoop, tez, and hive + JobConf jobConf = new JobConf(conf); + jobConf.set(MR_JAR_PROPERTY, ""); + for (BaseWork work : sparkWork.getAllWork()) { + work.configureJobConf(jobConf); + } + addJars(conf.get(MR_JAR_PROPERTY)); + + // add added files + String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); + addResources(addedFiles); + + // add added archives + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); + addResources(addedArchives); + } + + private void addResources(String addedFiles) { + for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { + if (!localFiles.contains(addedFile)) { + localFiles.add(addedFile); + sc.addFile(addedFile); + } + } + } + + private void addJars(String addedJars) { + for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { + if (!localJars.contains(addedJar)) { + localJars.add(addedJar); + sc.addJar(addedJar); + } + } + } + + @Override + public void close() { + sc.stop(); + client = null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java new file mode 100644 index 0000000..3972e30 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hive.spark.client.Job; +import org.apache.hive.spark.client.JobContext; +import org.apache.hive.spark.client.JobHandle; +import org.apache.hive.spark.client.SparkClient; +import org.apache.hive.spark.client.SparkClientFactory; +import org.apache.spark.SparkException; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaPairRDD; + +import java.io.IOException; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which + * wrap a spark job request and send to an remote SparkContext. + */ +public class RemoteHiveSparkClient implements HiveSparkClient { + private static final long serialVersionUID = 1L; + + private static final String MR_JAR_PROPERTY = "tmpjars"; + protected static transient final Log LOG = LogFactory + .getLog(RemoteHiveSparkClient.class); + + private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); + + private transient SparkClient remoteClient; + + private transient List localJars = new ArrayList(); + + private transient List localFiles = new ArrayList(); + + RemoteHiveSparkClient(Map sparkConf) throws IOException, SparkException { + SparkClientFactory.initialize(sparkConf); + remoteClient = SparkClientFactory.createClient(sparkConf); + } + + @Override + public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception { + final Context ctx = driverContext.getCtx(); + final HiveConf hiveConf = (HiveConf) ctx.getConf(); + refreshLocalResources(sparkWork, hiveConf); + final JobConf jobConf = new JobConf(hiveConf); + + // Create temporary scratch dir + final Path emptyScratchDir = ctx.getMRTmpPath(); + FileSystem fs = emptyScratchDir.getFileSystem(jobConf); + fs.mkdirs(emptyScratchDir); + + final byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf); + final byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir); + final byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork); + + JobHandle jobHandle = remoteClient.submit(new Job() { + @Override + public Serializable call(JobContext jc) throws Exception { + JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes); + Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); + SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class); + + SparkCounters sparkCounters = new SparkCounters(jc.sc(), localJobConf); + Map> prefixes = localSparkWork.getRequiredCounterPrefix(); + if (prefixes != null) { + for (String group : prefixes.keySet()) { + for (String counterName : prefixes.get(group)) { + sparkCounters.createCounter(group, counterName); + } + } + } + SparkReporter sparkReporter = new SparkReporter(sparkCounters); + + // Generate Spark plan + SparkPlanGenerator gen = + new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter); + SparkPlan plan = gen.generate(localSparkWork); + + // Execute generated plan. + JavaPairRDD finalRDD = plan.generateGraph(); + // We use Spark RDD async action to submit job as it's the only way to get jobId now. + JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); + jc.monitor(future); + return null; + } + }); + jobHandle.get(); + return new SparkJobRef(jobHandle.getClientJobId()); + } + + private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { + // add hive-exec jar + addJars((new JobConf(this.getClass())).getJar()); + + // add aux jars + addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)); + + // add added jars + String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); + addJars(addedJars); + + // add plugin module jars on demand + // jobConf will hold all the configuration for hadoop, tez, and hive + JobConf jobConf = new JobConf(conf); + jobConf.set(MR_JAR_PROPERTY, ""); + for (BaseWork work : sparkWork.getAllWork()) { + work.configureJobConf(jobConf); + } + addJars(conf.get(MR_JAR_PROPERTY)); + + // add added files + String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); + addResources(addedFiles); + + // add added archives + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); + addResources(addedArchives); + } + + private void addResources(String addedFiles) { + for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { + if (!localFiles.contains(addedFile)) { + localFiles.add(addedFile); + try { + remoteClient.addFile(SparkUtilities.getURL(addedFile)); + } catch (MalformedURLException e) { + LOG.warn("Failed to add file:" + addedFile); + } + } + } + } + + private void addJars(String addedJars) { + for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { + if (!localJars.contains(addedJar)) { + localJars.add(addedJar); + try { + remoteClient.addJar(SparkUtilities.getURL(addedJar)); + } catch (MalformedURLException e) { + LOG.warn("Failed to add jar:" + addedJar); + } + } + } + } + + @Override + public void close() { + remoteClient.stop(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java deleted file mode 100644 index ee16c9e..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ /dev/null @@ -1,288 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.spark; - -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; -import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener; -import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; -import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.ui.jobs.JobProgressListener; - -import scala.Tuple2; - -import com.google.common.base.Splitter; -import com.google.common.base.Strings; - -public class SparkClient implements Serializable { - private static final long serialVersionUID = 1L; - - private static final String MR_JAR_PROPERTY = "tmpjars"; - protected static transient final Log LOG = LogFactory - .getLog(SparkClient.class); - - private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); - - private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; - private static final String SPARK_DEFAULT_MASTER = "local"; - private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark"; - - private static SparkClient client; - - public static synchronized SparkClient getInstance(Configuration hiveConf) { - if (client == null) { - client = new SparkClient(hiveConf); - } - return client; - } - - /** - * Get Spark shuffle memory per task, and total number of cores. This - * information can be used to estimate how many reducers a task can have. - * - * @return a tuple, the first element is the shuffle memory per task in bytes, - * the second element is the number of total cores usable by the client - */ - public static Tuple2 - getMemoryAndCores(Configuration hiveConf) { - SparkClient client = getInstance(hiveConf); - SparkContext sc = client.sc.sc(); - SparkConf sparkConf = sc.conf(); - int cores = sparkConf.getInt("spark.executor.cores", sc.defaultParallelism()); - double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2); - // sc.executorMemory() is in MB, need to convert to bytes - long memoryPerTask = - (long) (sc.executorMemory() * memoryFraction * 1024 * 1024 / cores); - int executors = sc.getExecutorMemoryStatus().size(); - int totalCores = executors * cores; - LOG.info("Spark cluster current has executors: " + executors - + ", cores per executor: " + cores + ", memory per executor: " - + sc.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction); - return new Tuple2(Long.valueOf(memoryPerTask), - Integer.valueOf(totalCores)); - } - - private JavaSparkContext sc; - - private List localJars = new ArrayList(); - - private List localFiles = new ArrayList(); - - private JobStateListener jobStateListener; - - private JobProgressListener jobProgressListener; - - private SparkClient(Configuration hiveConf) { - SparkConf sparkConf = initiateSparkConf(hiveConf); - sc = new JavaSparkContext(sparkConf); - jobStateListener = new JobStateListener(); - jobProgressListener = new JobProgressListener(sparkConf); - sc.sc().listenerBus().addListener(jobStateListener); - sc.sc().listenerBus().addListener(jobProgressListener); - } - - private SparkConf initiateSparkConf(Configuration hiveConf) { - SparkConf sparkConf = new SparkConf(); - - // set default spark configurations. - sparkConf.set("spark.master", SPARK_DEFAULT_MASTER); - sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME); - sparkConf.set("spark.serializer", - "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.default.parallelism", "1"); - // load properties from spark-defaults.conf. - InputStream inputStream = null; - try { - inputStream = this.getClass().getClassLoader() - .getResourceAsStream(SPARK_DEFAULT_CONF_FILE); - if (inputStream != null) { - LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE); - Properties properties = new Properties(); - properties.load(inputStream); - for (String propertyName : properties.stringPropertyNames()) { - if (propertyName.startsWith("spark")) { - String value = properties.getProperty(propertyName); - sparkConf.set(propertyName, properties.getProperty(propertyName)); - LOG.info(String.format( - "load spark configuration from %s (%s -> %s).", - SPARK_DEFAULT_CONF_FILE, propertyName, value)); - } - } - } - } catch (IOException e) { - LOG.info("Failed to open spark configuration file:" - + SPARK_DEFAULT_CONF_FILE, e); - } finally { - if (inputStream != null) { - try { - inputStream.close(); - } catch (IOException e) { - LOG.debug("Failed to close inputstream.", e); - } - } - } - - // load properties from hive configurations. - Iterator> iterator = hiveConf.iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - String propertyName = entry.getKey(); - if (propertyName.startsWith("spark")) { - String value = entry.getValue(); - sparkConf.set(propertyName, value); - LOG.info(String.format( - "load spark configuration from hive configuration (%s -> %s).", - propertyName, value)); - } - } - - return sparkConf; - } - - public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception { - Context ctx = driverContext.getCtx(); - HiveConf hiveConf = (HiveConf) ctx.getConf(); - refreshLocalResources(sparkWork, hiveConf); - JobConf jobConf = new JobConf(hiveConf); - - // Create temporary scratch dir - Path emptyScratchDir; - emptyScratchDir = ctx.getMRTmpPath(); - FileSystem fs = emptyScratchDir.getFileSystem(jobConf); - fs.mkdirs(emptyScratchDir); - - SparkCounters sparkCounters = new SparkCounters(sc, hiveConf); - Map> prefixes = sparkWork.getRequiredCounterPrefix(); - // register spark counters before submit spark job. - if (prefixes != null) { - for (String group : prefixes.keySet()) { - for (String counter : prefixes.get(group)) { - sparkCounters.createCounter(group, counter); - } - } - } - SparkReporter sparkReporter = new SparkReporter(sparkCounters); - - // Generate Spark plan - SparkPlanGenerator gen = - new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter); - SparkPlan plan = gen.generate(sparkWork); - - // Execute generated plan. - JavaPairRDD finalRDD = plan.generateGraph(); - // We use Spark RDD async action to submit job as it's the only way to get jobId now. - JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); - // As we always use foreach action to submit RDD graph, it would only trigger on job. - int jobId = future.jobIds().get(0); - SimpleSparkJobStatus sparkJobStatus = - new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future); - return new SparkJobRef(jobId, sparkJobStatus); - } - - /** - * At this point single SparkContext is used by more than one thread, so make this - * method synchronized. - * - * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an - * issue we have to live with until multiple SparkContexts are supported in a single JVM. - */ - private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { - // add hive-exec jar - addJars((new JobConf(this.getClass())).getJar()); - - // add aux jars - addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)); - - // add added jars - String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); - addJars(addedJars); - - // add plugin module jars on demand - // jobConf will hold all the configuration for hadoop, tez, and hive - JobConf jobConf = new JobConf(conf); - jobConf.set(MR_JAR_PROPERTY, ""); - for (BaseWork work : sparkWork.getAllWork()) { - work.configureJobConf(jobConf); - } - addJars(conf.get(MR_JAR_PROPERTY)); - - // add added files - String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); - addResources(addedFiles); - - // add added archives - String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); - addResources(addedArchives); - } - - private void addResources(String addedFiles) { - for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { - if (!localFiles.contains(addedFile)) { - localFiles.add(addedFile); - sc.addFile(addedFile); - } - } - } - - private void addJars(String addedJars) { - for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { - if (!localJars.contains(addedJar)) { - localJars.add(addedJar); - sc.addJar(addedJar); - } - } - } - - public void close() { - sc.stop(); - client = null; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 2fea62d..3613784 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -110,15 +110,17 @@ public int execute(DriverContext driverContext) { SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); - sparkCounters = sparkJobStatus.getCounter(); - SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); - monitor.startMonitor(); - SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); - if (LOG.isInfoEnabled() && sparkStatistics != null) { - LOG.info(String.format("=====Spark Job[%d] statistics=====", jobRef.getJobId())); - logSparkStatistic(sparkStatistics); + if (sparkJobStatus != null) { + sparkCounters = sparkJobStatus.getCounter(); + SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); + monitor.startMonitor(); + SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); + if (LOG.isInfoEnabled() && sparkStatistics != null) { + LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); + logSparkStatistic(sparkStatistics); + } + sparkJobStatus.cleanup(); } - sparkJobStatus.cleanup(); rc = 0; } catch (Exception e) { LOG.error("Failed to execute spark task.", e); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index e3e6d16..ec7877e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -20,6 +20,9 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; +import java.net.MalformedURLException; +import java.net.URL; + /** * Contains utilities methods used as part of Spark tasks */ @@ -41,4 +44,17 @@ public static BytesWritable copyBytesWritable(BytesWritable bw) { copy.set(bw); return copy; } + + public static URL getURL(String path) throws MalformedURLException { + URL url = null; + if (path != null) { + if (path.contains(":")) { + url = new URL(path); + } else { + // if no file schema in path, we assume it's file on local fs. + url = new URL("file:" + path); + } + } + return url; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 51e0510..7b187a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.exec.spark.session; import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.exec.spark.SparkClient; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -34,7 +36,7 @@ private HiveConf conf; private boolean isOpen; private final String sessionId; - private SparkClient sparkClient; + private HiveSparkClient hiveSparkClient; public SparkSessionImpl() { sessionId = makeSessionId(); @@ -49,8 +51,9 @@ public void open(HiveConf conf) { @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs."); - sparkClient = SparkClient.getInstance(driverContext.getCtx().getConf()); - return sparkClient.execute(driverContext, sparkWork); + Configuration hiveConf = driverContext.getCtx().getConf(); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(hiveConf); + return hiveSparkClient.execute(driverContext, sparkWork); } @Override @@ -71,10 +74,10 @@ public String getSessionId() { @Override public void close() { isOpen = false; - if (sparkClient != null) { - sparkClient.close(); + if (hiveSparkClient != null) { + hiveSparkClient.close(); } - sparkClient = null; + hiveSparkClient = null; } public static String makeSessionId() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java index bf43b6e..d16d1b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java @@ -19,26 +19,26 @@ public class SparkJobRef { - private int jobId; + private String jobId; private SparkJobStatus sparkJobStatus; public SparkJobRef() {} - public SparkJobRef(int jobId) { + public SparkJobRef(String jobId) { this.jobId = jobId; } - public SparkJobRef(int jobId, SparkJobStatus sparkJobStatus) { + public SparkJobRef(String jobId, SparkJobStatus sparkJobStatus) { this.jobId = jobId; this.sparkJobStatus = sparkJobStatus; } - public int getJobId() { + public String getJobId() { return jobId; } - public void setJobId(int jobId) { + public void setJobId(String jobId) { this.jobId = jobId; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index d4d14a3..6a92a6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.SparkClient; +import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -74,34 +74,34 @@ public Object process(Node nd, Stack stack, LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers); desc.setNumReducers(constantReducers); } else { - long numberOfBytes = 0; - - // we need to add up all the estimates from the siblings of this reduce sink - for (Operator sibling: - sink.getChildOperators().get(0).getParentOperators()) { - if (sibling.getStatistics() != null) { - numberOfBytes += sibling.getStatistics().getDataSize(); - } else { - LOG.warn("No stats available from: " + sibling); - } - } - - if (sparkMemoryAndCores == null) { - sparkMemoryAndCores = SparkClient.getMemoryAndCores(context.getConf()); - } - - // Divide it by 2 so that we can have more reducers - long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2; - int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, - maxReducers, false); - - // If there are more cores, use the number of cores - int cores = sparkMemoryAndCores._2.intValue(); - if (numReducers < cores) { - numReducers = cores; - } - LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers); - desc.setNumReducers(numReducers); +// long numberOfBytes = 0; +// +// // we need to add up all the estimates from the siblings of this reduce sink +// for (Operator sibling: +// sink.getChildOperators().get(0).getParentOperators()) { +// if (sibling.getStatistics() != null) { +// numberOfBytes += sibling.getStatistics().getDataSize(); +// } else { +// LOG.warn("No stats available from: " + sibling); +// } +// } +// +// if (sparkMemoryAndCores == null) { +// sparkMemoryAndCores = SparkClient.getMemoryAndCores(context.getConf()); +// } +// +// // Divide it by 2 so that we can have more reducers +// long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2; +// int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, +// maxReducers, false); +// +// // If there are more cores, use the number of cores +// int cores = sparkMemoryAndCores._2.intValue(); +// if (numReducers < cores) { +// numReducers = cores; +// } +// LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers); +// desc.setNumReducers(numReducers); } } else { LOG.info("Number of reducers determined to be: " + desc.getNumReducers()); diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java index 8346b28..982adbd 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java @@ -27,7 +27,7 @@ * Defines the API for the Spark remote client. */ @InterfaceAudience.Private -public interface SparkClient { +public interface SparkClient extends Serializable { /** * Submits a job for asynchronous execution.