diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 5fce423..3a89ca6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.ql.exec.spark; import org.apache.commons.compress.utils.CharsetNames; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java index 044cf7c..fe6bce1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -20,13 +20,14 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.SparkWork; public interface SparkSession { /** * Initializes a Spark session for DAG execution. */ - public void open(HiveConf conf); + public void open(HiveConf conf) throws HiveException; /** * Submit given sparkWork to SparkClient diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 1f7ed83..650ac11 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -26,15 +26,13 @@ import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.spark.SparkException; import java.io.IOException; import java.util.UUID; -/** - * Simple implementation of SparkSession which currently just submits jobs to - * SparkClient which is shared by all SparkSession instances. - */ public class SparkSessionImpl implements SparkSession { private static final Log LOG = LogFactory.getLog(SparkSession.class); @@ -48,16 +46,19 @@ public SparkSessionImpl() { } @Override - public void open(HiveConf conf) { + public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; + try { + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); + } catch (Exception e) { + throw new HiveException("Failed to create spark client.", e); + } } @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs."); - Configuration hiveConf = driverContext.getCtx().getConf(); - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(hiveConf); return hiveSparkClient.execute(driverContext, sparkWork); }