diff --git ql/pom.xml ql/pom.xml
index 06d7f27..a8e779c 100644
--- ql/pom.xml
+++ ql/pom.xml
@@ -61,6 +61,11 @@
hive-shims
${project.version}
+
+ org.apache.hive
+ spark-client
+ ${project.version}
+
com.esotericsoftware.kryo
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
new file mode 100644
index 0000000..fc63180
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+public interface HiveSparkClient extends Serializable, Closeable {
+ /**
+ * HiveSparkClient should generate Spark RDD graph by given sparkWork and driverContext,
+ * and submit RDD graph to Spark cluster.
+ * @param driverContext
+ * @param sparkWork
+ * @return SparkJobRef could be used to track spark job progress and metrics.
+ * @throws Exception
+ */
+ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
new file mode 100644
index 0000000..cd423be
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.commons.compress.utils.CharsetNames;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+public class HiveSparkClientFactory {
+ protected static transient final Log LOG = LogFactory
+ .getLog(HiveSparkClientFactory.class);
+
+ private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
+ private static final String SPARK_DEFAULT_MASTER = "local";
+ private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark";
+
+ public static HiveSparkClient createHiveSparkClient(Configuration configuration)
+ throws IOException, SparkException {
+
+ Map conf = initiateSparkConf(configuration);
+ // Submit spark job through local spark context while spark master is local mode, otherwise submit
+ // spark job through remote spark context.
+ String master = conf.get("spark.master");
+ if (master.equals("local") || master.startsWith("local[")) {
+ // With local spark context, all user sessions share the same spark context.
+ return LocalHiveSparkClient.getInstance(generateSparkConf(conf));
+ } else {
+ return new RemoteHiveSparkClient(conf);
+ }
+ }
+
+ private static Map initiateSparkConf(Configuration hiveConf) {
+ Map sparkConf = new HashMap();
+
+ // set default spark configurations.
+ sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
+ sparkConf.put("spark.app.name", SAPRK_DEFAULT_APP_NAME);
+ sparkConf.put("spark.serializer",
+ "org.apache.spark.serializer.KryoSerializer");
+ sparkConf.put("spark.default.parallelism", "1");
+
+ // load properties from spark-defaults.conf.
+ InputStream inputStream = null;
+ try {
+ inputStream = HiveSparkClientFactory.class.getClassLoader()
+ .getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
+ if (inputStream != null) {
+ LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE);
+ Properties properties = new Properties();
+ properties.load(new InputStreamReader(inputStream, CharsetNames.UTF_8));
+ for (String propertyName : properties.stringPropertyNames()) {
+ if (propertyName.startsWith("spark")) {
+ String value = properties.getProperty(propertyName);
+ sparkConf.put(propertyName, properties.getProperty(propertyName));
+ LOG.info(String.format(
+ "load spark configuration from %s (%s -> %s).",
+ SPARK_DEFAULT_CONF_FILE, propertyName, value));
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.info("Failed to open spark configuration file:"
+ + SPARK_DEFAULT_CONF_FILE, e);
+ } finally {
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ LOG.debug("Failed to close inputstream.", e);
+ }
+ }
+ }
+
+ // load properties from hive configurations.
+ for (Map.Entry entry : hiveConf) {
+ String propertyName = entry.getKey();
+ if (propertyName.startsWith("spark")) {
+ String value = entry.getValue();
+ sparkConf.put(propertyName, value);
+ LOG.info(String.format(
+ "load spark configuration from hive configuration (%s -> %s).",
+ propertyName, value));
+ }
+ }
+
+ return sparkConf;
+ }
+
+ private static SparkConf generateSparkConf(Map conf) {
+ SparkConf sparkConf = new SparkConf(false);
+ for (Map.Entry entry : conf.entrySet()) {
+ sparkConf.set(entry.getKey(), entry.getValue());
+ }
+ return sparkConf;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
new file mode 100644
index 0000000..797bd11
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.ui.jobs.JobProgressListener;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * LocalSparkClient submit Spark job in local driver, it's responsible for build spark client
+ * environment and execute spark work.
+ */
+public class LocalHiveSparkClient implements HiveSparkClient {
+ private static final long serialVersionUID = 1L;
+
+ private static final String MR_JAR_PROPERTY = "tmpjars";
+ protected static transient final Log LOG = LogFactory
+ .getLog(LocalHiveSparkClient.class);
+
+ private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+
+ private static LocalHiveSparkClient client;
+
+ public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) {
+ if (client == null) {
+ client = new LocalHiveSparkClient(sparkConf);
+ }
+ return client;
+ }
+
+ /**
+ * Get Spark shuffle memory per task, and total number of cores. This
+ * information can be used to estimate how many reducers a task can have.
+ *
+ * @return a tuple, the first element is the shuffle memory per task in bytes,
+ * the second element is the number of total cores usable by the client
+ */
+ public Tuple2 getMemoryAndCores() {
+ SparkContext sparkContext = sc.sc();
+ SparkConf sparkConf = sparkContext.conf();
+ int cores = sparkConf.getInt("spark.executor.cores", 1);
+ double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
+ // sc.executorMemory() is in MB, need to convert to bytes
+ long memoryPerTask =
+ (long) (sparkContext.executorMemory() * memoryFraction * 1024 * 1024 / cores);
+ int executors = sparkContext.getExecutorMemoryStatus().size();
+ int totalCores = executors * cores;
+ LOG.info("Spark cluster current has executors: " + executors
+ + ", cores per executor: " + cores + ", memory per executor: "
+ + sparkContext.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction);
+ return new Tuple2(Long.valueOf(memoryPerTask),
+ Integer.valueOf(totalCores));
+ }
+
+ private JavaSparkContext sc;
+
+ private List localJars = new ArrayList();
+
+ private List localFiles = new ArrayList();
+
+ private JobStateListener jobStateListener;
+
+ private JobProgressListener jobProgressListener;
+
+ private LocalHiveSparkClient(SparkConf sparkConf) {
+ sc = new JavaSparkContext(sparkConf);
+ jobStateListener = new JobStateListener();
+ jobProgressListener = new JobProgressListener(sparkConf);
+ sc.sc().listenerBus().addListener(jobStateListener);
+ sc.sc().listenerBus().addListener(jobProgressListener);
+ }
+
+ @Override
+ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
+ Context ctx = driverContext.getCtx();
+ HiveConf hiveConf = (HiveConf) ctx.getConf();
+ refreshLocalResources(sparkWork, hiveConf);
+ JobConf jobConf = new JobConf(hiveConf);
+
+ // Create temporary scratch dir
+ Path emptyScratchDir;
+ emptyScratchDir = ctx.getMRTmpPath();
+ FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
+ fs.mkdirs(emptyScratchDir);
+
+ SparkCounters sparkCounters = new SparkCounters(sc, hiveConf);
+ Map> prefixes = sparkWork.getRequiredCounterPrefix();
+ if (prefixes != null) {
+ for (String group : prefixes.keySet()) {
+ for (String counterName : prefixes.get(group)) {
+ sparkCounters.createCounter(group, counterName);
+ }
+ }
+ }
+ SparkReporter sparkReporter = new SparkReporter(sparkCounters);
+
+ // Generate Spark plan
+ SparkPlanGenerator gen =
+ new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter);
+ SparkPlan plan = gen.generate(sparkWork);
+
+ // Execute generated plan.
+ JavaPairRDD finalRDD = plan.generateGraph();
+ // We use Spark RDD async action to submit job as it's the only way to get jobId now.
+ JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+ // As we always use foreach action to submit RDD graph, it would only trigger on job.
+ int jobId = future.jobIds().get(0);
+ SimpleSparkJobStatus sparkJobStatus =
+ new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future);
+ return new SparkJobRef(Integer.toString(jobId), sparkJobStatus);
+ }
+
+ /**
+ * At this point single SparkContext is used by more than one thread, so make this
+ * method synchronized.
+ *
+ * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
+ * issue we have to live with until multiple SparkContexts are supported in a single JVM.
+ */
+ private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+ // add hive-exec jar
+ addJars((new JobConf(this.getClass())).getJar());
+
+ // add aux jars
+ addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
+
+ // add added jars
+ String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
+ addJars(addedJars);
+
+ // add plugin module jars on demand
+ // jobConf will hold all the configuration for hadoop, tez, and hive
+ JobConf jobConf = new JobConf(conf);
+ jobConf.set(MR_JAR_PROPERTY, "");
+ for (BaseWork work : sparkWork.getAllWork()) {
+ work.configureJobConf(jobConf);
+ }
+ addJars(conf.get(MR_JAR_PROPERTY));
+
+ // add added files
+ String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
+ addResources(addedFiles);
+
+ // add added archives
+ String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
+ addResources(addedArchives);
+ }
+
+ private void addResources(String addedFiles) {
+ for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
+ if (!localFiles.contains(addedFile)) {
+ localFiles.add(addedFile);
+ sc.addFile(addedFile);
+ }
+ }
+ }
+
+ private void addJars(String addedJars) {
+ for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
+ if (!localJars.contains(addedJar)) {
+ localJars.add(addedJar);
+ sc.addJar(addedJar);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ sc.stop();
+ client = null;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
new file mode 100644
index 0000000..93d486f
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hive.spark.client.Job;
+import org.apache.hive.spark.client.JobContext;
+import org.apache.hive.spark.client.JobHandle;
+import org.apache.hive.spark.client.SparkClient;
+import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
+ * wrap a spark job request and send to an remote SparkContext.
+ */
+public class RemoteHiveSparkClient implements HiveSparkClient {
+ private static final long serialVersionUID = 1L;
+
+ private static final String MR_JAR_PROPERTY = "tmpjars";
+ protected static transient final Log LOG = LogFactory
+ .getLog(RemoteHiveSparkClient.class);
+
+ private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+
+ private transient SparkClient remoteClient;
+
+ private transient List localJars = new ArrayList();
+
+ private transient List localFiles = new ArrayList();
+
+ RemoteHiveSparkClient(Map sparkConf) throws IOException, SparkException {
+ SparkClientFactory.initialize(sparkConf);
+ remoteClient = SparkClientFactory.createClient(sparkConf);
+ }
+
+ @Override
+ public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
+ final Context ctx = driverContext.getCtx();
+ final HiveConf hiveConf = (HiveConf) ctx.getConf();
+ refreshLocalResources(sparkWork, hiveConf);
+ final JobConf jobConf = new JobConf(hiveConf);
+
+ // Create temporary scratch dir
+ final Path emptyScratchDir = ctx.getMRTmpPath();
+ FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
+ fs.mkdirs(emptyScratchDir);
+
+ final byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf);
+ final byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
+ final byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
+
+ JobHandle jobHandle = remoteClient.submit(new Job() {
+ @Override
+ public Serializable call(JobContext jc) throws Exception {
+ JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes);
+ Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
+ SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class);
+
+ SparkCounters sparkCounters = new SparkCounters(jc.sc(), localJobConf);
+ Map> prefixes = localSparkWork.getRequiredCounterPrefix();
+ if (prefixes != null) {
+ for (String group : prefixes.keySet()) {
+ for (String counterName : prefixes.get(group)) {
+ sparkCounters.createCounter(group, counterName);
+ }
+ }
+ }
+ SparkReporter sparkReporter = new SparkReporter(sparkCounters);
+
+ // Generate Spark plan
+ SparkPlanGenerator gen =
+ new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter);
+ SparkPlan plan = gen.generate(localSparkWork);
+
+ // Execute generated plan.
+ JavaPairRDD finalRDD = plan.generateGraph();
+ // We use Spark RDD async action to submit job as it's the only way to get jobId now.
+ JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+ jc.monitor(future);
+ return null;
+ }
+ });
+ jobHandle.get();
+ return new SparkJobRef(jobHandle.getClientJobId());
+ }
+
+ private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+ // add hive-exec jar
+ addJars((new JobConf(this.getClass())).getJar());
+
+ // add aux jars
+ addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
+
+ // add added jars
+ String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
+ addJars(addedJars);
+
+ // add plugin module jars on demand
+ // jobConf will hold all the configuration for hadoop, tez, and hive
+ JobConf jobConf = new JobConf(conf);
+ jobConf.set(MR_JAR_PROPERTY, "");
+ for (BaseWork work : sparkWork.getAllWork()) {
+ work.configureJobConf(jobConf);
+ }
+ addJars(conf.get(MR_JAR_PROPERTY));
+
+ // add added files
+ String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
+ addResources(addedFiles);
+
+ // add added archives
+ String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
+ addResources(addedArchives);
+ }
+
+ private void addResources(String addedFiles) {
+ for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
+ if (!localFiles.contains(addedFile)) {
+ localFiles.add(addedFile);
+ try {
+ remoteClient.addFile(SparkUtilities.getURL(addedFile));
+ } catch (MalformedURLException e) {
+ LOG.warn("Failed to add file:" + addedFile);
+ }
+ }
+ }
+ }
+
+ private void addJars(String addedJars) {
+ for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
+ if (!localJars.contains(addedJar)) {
+ localJars.add(addedJar);
+ try {
+ remoteClient.addJar(SparkUtilities.getURL(addedJar));
+ } catch (MalformedURLException e) {
+ LOG.warn("Failed to add jar:" + addedJar);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ remoteClient.stop();
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
deleted file mode 100644
index ee16c9e..0000000
--- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.spark;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
-import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener;
-import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
-import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaFutureAction;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.ui.jobs.JobProgressListener;
-
-import scala.Tuple2;
-
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-
-public class SparkClient implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private static final String MR_JAR_PROPERTY = "tmpjars";
- protected static transient final Log LOG = LogFactory
- .getLog(SparkClient.class);
-
- private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
-
- private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
- private static final String SPARK_DEFAULT_MASTER = "local";
- private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark";
-
- private static SparkClient client;
-
- public static synchronized SparkClient getInstance(Configuration hiveConf) {
- if (client == null) {
- client = new SparkClient(hiveConf);
- }
- return client;
- }
-
- /**
- * Get Spark shuffle memory per task, and total number of cores. This
- * information can be used to estimate how many reducers a task can have.
- *
- * @return a tuple, the first element is the shuffle memory per task in bytes,
- * the second element is the number of total cores usable by the client
- */
- public static Tuple2
- getMemoryAndCores(Configuration hiveConf) {
- SparkClient client = getInstance(hiveConf);
- SparkContext sc = client.sc.sc();
- SparkConf sparkConf = sc.conf();
- int cores = sparkConf.getInt("spark.executor.cores", sc.defaultParallelism());
- double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
- // sc.executorMemory() is in MB, need to convert to bytes
- long memoryPerTask =
- (long) (sc.executorMemory() * memoryFraction * 1024 * 1024 / cores);
- int executors = sc.getExecutorMemoryStatus().size();
- int totalCores = executors * cores;
- LOG.info("Spark cluster current has executors: " + executors
- + ", cores per executor: " + cores + ", memory per executor: "
- + sc.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction);
- return new Tuple2(Long.valueOf(memoryPerTask),
- Integer.valueOf(totalCores));
- }
-
- private JavaSparkContext sc;
-
- private List localJars = new ArrayList();
-
- private List localFiles = new ArrayList();
-
- private JobStateListener jobStateListener;
-
- private JobProgressListener jobProgressListener;
-
- private SparkClient(Configuration hiveConf) {
- SparkConf sparkConf = initiateSparkConf(hiveConf);
- sc = new JavaSparkContext(sparkConf);
- jobStateListener = new JobStateListener();
- jobProgressListener = new JobProgressListener(sparkConf);
- sc.sc().listenerBus().addListener(jobStateListener);
- sc.sc().listenerBus().addListener(jobProgressListener);
- }
-
- private SparkConf initiateSparkConf(Configuration hiveConf) {
- SparkConf sparkConf = new SparkConf();
-
- // set default spark configurations.
- sparkConf.set("spark.master", SPARK_DEFAULT_MASTER);
- sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME);
- sparkConf.set("spark.serializer",
- "org.apache.spark.serializer.KryoSerializer");
- sparkConf.set("spark.default.parallelism", "1");
- // load properties from spark-defaults.conf.
- InputStream inputStream = null;
- try {
- inputStream = this.getClass().getClassLoader()
- .getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
- if (inputStream != null) {
- LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE);
- Properties properties = new Properties();
- properties.load(inputStream);
- for (String propertyName : properties.stringPropertyNames()) {
- if (propertyName.startsWith("spark")) {
- String value = properties.getProperty(propertyName);
- sparkConf.set(propertyName, properties.getProperty(propertyName));
- LOG.info(String.format(
- "load spark configuration from %s (%s -> %s).",
- SPARK_DEFAULT_CONF_FILE, propertyName, value));
- }
- }
- }
- } catch (IOException e) {
- LOG.info("Failed to open spark configuration file:"
- + SPARK_DEFAULT_CONF_FILE, e);
- } finally {
- if (inputStream != null) {
- try {
- inputStream.close();
- } catch (IOException e) {
- LOG.debug("Failed to close inputstream.", e);
- }
- }
- }
-
- // load properties from hive configurations.
- Iterator> iterator = hiveConf.iterator();
- while (iterator.hasNext()) {
- Map.Entry entry = iterator.next();
- String propertyName = entry.getKey();
- if (propertyName.startsWith("spark")) {
- String value = entry.getValue();
- sparkConf.set(propertyName, value);
- LOG.info(String.format(
- "load spark configuration from hive configuration (%s -> %s).",
- propertyName, value));
- }
- }
-
- return sparkConf;
- }
-
- public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
- Context ctx = driverContext.getCtx();
- HiveConf hiveConf = (HiveConf) ctx.getConf();
- refreshLocalResources(sparkWork, hiveConf);
- JobConf jobConf = new JobConf(hiveConf);
-
- // Create temporary scratch dir
- Path emptyScratchDir;
- emptyScratchDir = ctx.getMRTmpPath();
- FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
- fs.mkdirs(emptyScratchDir);
-
- SparkCounters sparkCounters = new SparkCounters(sc, hiveConf);
- Map> prefixes = sparkWork.getRequiredCounterPrefix();
- // register spark counters before submit spark job.
- if (prefixes != null) {
- for (String group : prefixes.keySet()) {
- for (String counter : prefixes.get(group)) {
- sparkCounters.createCounter(group, counter);
- }
- }
- }
- SparkReporter sparkReporter = new SparkReporter(sparkCounters);
-
- // Generate Spark plan
- SparkPlanGenerator gen =
- new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter);
- SparkPlan plan = gen.generate(sparkWork);
-
- // Execute generated plan.
- JavaPairRDD finalRDD = plan.generateGraph();
- // We use Spark RDD async action to submit job as it's the only way to get jobId now.
- JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
- // As we always use foreach action to submit RDD graph, it would only trigger on job.
- int jobId = future.jobIds().get(0);
- SimpleSparkJobStatus sparkJobStatus =
- new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future);
- return new SparkJobRef(jobId, sparkJobStatus);
- }
-
- /**
- * At this point single SparkContext is used by more than one thread, so make this
- * method synchronized.
- *
- * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
- * issue we have to live with until multiple SparkContexts are supported in a single JVM.
- */
- private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
- // add hive-exec jar
- addJars((new JobConf(this.getClass())).getJar());
-
- // add aux jars
- addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
-
- // add added jars
- String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
- addJars(addedJars);
-
- // add plugin module jars on demand
- // jobConf will hold all the configuration for hadoop, tez, and hive
- JobConf jobConf = new JobConf(conf);
- jobConf.set(MR_JAR_PROPERTY, "");
- for (BaseWork work : sparkWork.getAllWork()) {
- work.configureJobConf(jobConf);
- }
- addJars(conf.get(MR_JAR_PROPERTY));
-
- // add added files
- String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
- addResources(addedFiles);
-
- // add added archives
- String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
- addResources(addedArchives);
- }
-
- private void addResources(String addedFiles) {
- for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
- if (!localFiles.contains(addedFile)) {
- localFiles.add(addedFile);
- sc.addFile(addedFile);
- }
- }
- }
-
- private void addJars(String addedJars) {
- for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
- if (!localJars.contains(addedJar)) {
- localJars.add(addedJar);
- sc.addJar(addedJar);
- }
- }
- }
-
- public void close() {
- sc.stop();
- client = null;
- }
-}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 2fea62d..3613784 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -110,15 +110,17 @@ public int execute(DriverContext driverContext) {
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
- sparkCounters = sparkJobStatus.getCounter();
- SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
- monitor.startMonitor();
- SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
- if (LOG.isInfoEnabled() && sparkStatistics != null) {
- LOG.info(String.format("=====Spark Job[%d] statistics=====", jobRef.getJobId()));
- logSparkStatistic(sparkStatistics);
+ if (sparkJobStatus != null) {
+ sparkCounters = sparkJobStatus.getCounter();
+ SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
+ monitor.startMonitor();
+ SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
+ if (LOG.isInfoEnabled() && sparkStatistics != null) {
+ LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
+ logSparkStatistic(sparkStatistics);
+ }
+ sparkJobStatus.cleanup();
}
- sparkJobStatus.cleanup();
rc = 0;
} catch (Exception e) {
LOG.error("Failed to execute spark task.", e);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index e3e6d16..2f09384 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -20,6 +20,12 @@
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
/**
* Contains utilities methods used as part of Spark tasks
*/
@@ -41,4 +47,22 @@ public static BytesWritable copyBytesWritable(BytesWritable bw) {
copy.set(bw);
return copy;
}
+
+ public static URL getURL(String path) throws MalformedURLException {
+ URL url = null;
+ try {
+ URI uri = new URI(path);
+ if (path != null) {
+ if (uri.getScheme() != null) {
+ url = uri.toURL();
+ } else {
+ // if no file schema in path, we assume it's file on local fs.
+ url = new File(path).toURI().toURL();
+ }
+ }
+ } catch (URISyntaxException e) {
+ // do nothing here, just return null if input path is not a valid URI.
+ }
+ return url;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 51e0510..1f7ed83 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -18,12 +18,17 @@
package org.apache.hadoop.hive.ql.exec.spark.session;
import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import java.io.IOException;
import java.util.UUID;
/**
@@ -31,10 +36,12 @@
* SparkClient which is shared by all SparkSession instances.
*/
public class SparkSessionImpl implements SparkSession {
+ private static final Log LOG = LogFactory.getLog(SparkSession.class);
+
private HiveConf conf;
private boolean isOpen;
private final String sessionId;
- private SparkClient sparkClient;
+ private HiveSparkClient hiveSparkClient;
public SparkSessionImpl() {
sessionId = makeSessionId();
@@ -49,8 +56,9 @@ public void open(HiveConf conf) {
@Override
public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs.");
- sparkClient = SparkClient.getInstance(driverContext.getCtx().getConf());
- return sparkClient.execute(driverContext, sparkWork);
+ Configuration hiveConf = driverContext.getCtx().getConf();
+ hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(hiveConf);
+ return hiveSparkClient.execute(driverContext, sparkWork);
}
@Override
@@ -71,10 +79,14 @@ public String getSessionId() {
@Override
public void close() {
isOpen = false;
- if (sparkClient != null) {
- sparkClient.close();
+ if (hiveSparkClient != null) {
+ try {
+ hiveSparkClient.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close spark session (" + sessionId + ").", e);
+ }
}
- sparkClient = null;
+ hiveSparkClient = null;
}
public static String makeSessionId() {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
index bf43b6e..d16d1b4 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
@@ -19,26 +19,26 @@
public class SparkJobRef {
- private int jobId;
+ private String jobId;
private SparkJobStatus sparkJobStatus;
public SparkJobRef() {}
- public SparkJobRef(int jobId) {
+ public SparkJobRef(String jobId) {
this.jobId = jobId;
}
- public SparkJobRef(int jobId, SparkJobStatus sparkJobStatus) {
+ public SparkJobRef(String jobId, SparkJobStatus sparkJobStatus) {
this.jobId = jobId;
this.sparkJobStatus = sparkJobStatus;
}
- public int getJobId() {
+ public String getJobId() {
return jobId;
}
- public void setJobId(int jobId) {
+ public void setJobId(String jobId) {
this.jobId = jobId;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
index d4d14a3..f2bb15f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
+import java.io.IOException;
import java.util.Stack;
import org.apache.commons.logging.Log;
@@ -26,7 +27,9 @@
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -35,6 +38,7 @@
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.spark.SparkException;
import scala.Tuple2;
/**
@@ -69,39 +73,54 @@ public Object process(Node nd, Stack stack,
context.getVisitedReduceSinks().add(sink);
+
if (desc.getNumReducers() <= 0) {
if (constantReducers > 0) {
LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
desc.setNumReducers(constantReducers);
} else {
- long numberOfBytes = 0;
+ try {
+ // TODO try to make this still work after integration with remote spark context, so that we
+ // don't break test, we should implement automatic calculate reduce number for remote spark
+ // client and refactor code later, track it with HIVE-8855.
+ HiveSparkClient sparkClient = HiveSparkClientFactory.createHiveSparkClient(context.getConf());
+ if (sparkClient instanceof LocalHiveSparkClient) {
+ LocalHiveSparkClient localHiveSparkClient = (LocalHiveSparkClient)sparkClient;
+ long numberOfBytes = 0;
+
+ // we need to add up all the estimates from the siblings of this reduce sink
+ for (Operator extends OperatorDesc> sibling:
+ sink.getChildOperators().get(0).getParentOperators()) {
+ if (sibling.getStatistics() != null) {
+ numberOfBytes += sibling.getStatistics().getDataSize();
+ } else {
+ LOG.warn("No stats available from: " + sibling);
+ }
+ }
+
+ if (sparkMemoryAndCores == null) {
+ sparkMemoryAndCores = localHiveSparkClient.getMemoryAndCores();
+ }
+
+ // Divide it by 2 so that we can have more reducers
+ long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
+ int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
+ maxReducers, false);
+
+ // If there are more cores, use the number of cores
+ int cores = sparkMemoryAndCores._2.intValue();
+ if (numReducers < cores) {
+ numReducers = cores;
+ }
+ LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
+ desc.setNumReducers(numReducers);
- // we need to add up all the estimates from the siblings of this reduce sink
- for (Operator extends OperatorDesc> sibling:
- sink.getChildOperators().get(0).getParentOperators()) {
- if (sibling.getStatistics() != null) {
- numberOfBytes += sibling.getStatistics().getDataSize();
} else {
- LOG.warn("No stats available from: " + sibling);
+ sparkClient.close();
}
+ } catch (Exception e) {
+ LOG.warn("Failed to create spark client.", e);
}
-
- if (sparkMemoryAndCores == null) {
- sparkMemoryAndCores = SparkClient.getMemoryAndCores(context.getConf());
- }
-
- // Divide it by 2 so that we can have more reducers
- long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
- int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
- maxReducers, false);
-
- // If there are more cores, use the number of cores
- int cores = sparkMemoryAndCores._2.intValue();
- if (numReducers < cores) {
- numReducers = cores;
- }
- LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
- desc.setNumReducers(numReducers);
}
} else {
LOG.info("Number of reducers determined to be: " + desc.getNumReducers());
diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
index 8346b28..982adbd 100644
--- spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
+++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
@@ -27,7 +27,7 @@
* Defines the API for the Spark remote client.
*/
@InterfaceAudience.Private
-public interface SparkClient {
+public interface SparkClient extends Serializable {
/**
* Submits a job for asynchronous execution.
diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 5af66ee..161182f 100644
--- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -190,7 +190,7 @@ public void run() {
LOG.info("No spark.home provided, calling SparkSubmit directly.");
argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
- if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client")) {
+ if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) {
String mem = conf.get("spark.driver.memory");
if (mem != null) {
argv.add("-Xms" + mem);