diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java index 7cfd43d..65f34e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java @@ -6,15 +6,14 @@ import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; - public abstract class HivePairFlatMapFunction implements PairFlatMapFunction { protected transient JobConf jobConf; private byte[] buffer; - protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); - protected static final NumberFormat stageIdFormat = NumberFormat.getInstance(); + private static final NumberFormat taskIdFormat = NumberFormat.getInstance(); + private static final NumberFormat stageIdFormat = NumberFormat.getInstance(); static { taskIdFormat.setGroupingUsed(false); @@ -42,7 +41,7 @@ private void setupMRLegacyConfigs() { StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_"); taskAttemptIdBuilder.append(System.currentTimeMillis()) .append("_") - .append(stageIdFormat.format(TaskContext.get().getStageId())) + .append(stageIdFormat.format(TaskContext.get().stageId())) .append("_"); if (isMap()) { @@ -53,12 +52,12 @@ private void setupMRLegacyConfigs() { // Spark task attempt id is increased by Spark context instead of task, which may introduce // unstable qtest output, since non Hive features depends on this, we always set it to 0 here. - taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().getPartitionId())) + taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId())) .append("_0"); String taskAttemptIdStr = taskAttemptIdBuilder.toString(); jobConf.set("mapred.task.id", taskAttemptIdStr); jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr); - jobConf.setInt("mapred.task.partition", TaskContext.get().getPartitionId()); + jobConf.setInt("mapred.task.partition", TaskContext.get().partitionId()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index 39af1d1..0b74552 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -38,9 +38,8 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; -import org.apache.spark.FutureAction; -import org.apache.spark.SimpleFutureAction; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ui.jobs.JobProgressListener; @@ -181,11 +180,14 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { try { JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. - FutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); + JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); // An action may trigger multi jobs in Spark, we only monitor the latest job here // until we found that Hive does trigger multi jobs. + List jobIds = future.jobIds(); + // jobIds size is always bigger than or equal with 1. + int jobId = jobIds.get(jobIds.size() - 1); SimpleSparkJobStatus sparkJobStatus = new SimpleSparkJobStatus( - (Integer) future.jobIds().last(), jobStateListener, jobProgressListener); + jobId, jobStateListener, jobProgressListener); SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); monitor.startMonitor(); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index a0d7b1a..6ac72ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -25,15 +25,6 @@ * Contains utilities methods used as part of Spark tasks */ public class SparkUtilities { - public static void setTaskInfoInJobConf(JobConf jobConf, TaskContext taskContext) { - // Set mapred.task.partition in executor side. - jobConf.setInt("mapred.task.partition", taskContext.getPartitionId()); - - // Set mapred.task.id as taskId_attemptId. The taskId is 6 digits in length (prefixed with 0 if - // necessary). Similarly attemptId is two digits in length. - jobConf.set("mapred.task.id", - String.format("%06d_%02d", taskContext.getPartitionId(), taskContext.getAttemptId())); - } public static BytesWritable copyBytesWritable(BytesWritable bw) { BytesWritable copy = new BytesWritable();