diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index 3fd8e47..90736c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -21,6 +21,7 @@ import java.util.Iterator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.io.BytesWritable; @@ -36,8 +37,8 @@ private static final long serialVersionUID = 1L; - public HiveMapFunction(byte[] buffer) { - super(buffer); + public HiveMapFunction(byte[] jobConfBuffer, SparkReporter sparkReporter) { + super(jobConfBuffer, sparkReporter); } @Override @@ -56,7 +57,7 @@ public HiveMapFunction(byte[] buffer) { HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler); //TODO we need to implement a Spark specified Reporter to collect stats, refer to HIVE-7709. - mapRecordHandler.init(jobConf, result, Reporter.NULL); + mapRecordHandler.init(jobConf, result, sparkReporter); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java index 7cfd43d..c8fa3d9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java @@ -9,13 +9,8 @@ public abstract class HivePairFlatMapFunction implements PairFlatMapFunction { - protected transient JobConf jobConf; - - private byte[] buffer; - protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat stageIdFormat = NumberFormat.getInstance(); - static { taskIdFormat.setGroupingUsed(false); taskIdFormat.setMinimumIntegerDigits(6); @@ -23,8 +18,14 @@ stageIdFormat.setMinimumIntegerDigits(4); } - public HivePairFlatMapFunction(byte[] buffer) { + protected transient JobConf jobConf; + protected SparkReporter sparkReporter; + + private byte[] buffer; + + public HivePairFlatMapFunction(byte[] buffer, SparkReporter sparkReporter) { this.buffer = buffer; + this.sparkReporter = sparkReporter; } protected void initJobConf() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index 5153885..3c7d85c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -35,8 +35,8 @@ private static final long serialVersionUID = 1L; - public HiveReduceFunction(byte[] buffer) { - super(buffer); + public HiveReduceFunction(byte[] buffer, SparkReporter sparkReporter) { + super(buffer, sparkReporter); } @Override @@ -47,7 +47,7 @@ public HiveReduceFunction(byte[] buffer) { SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); HiveReduceFunctionResultList result = new HiveReduceFunctionResultList(jobConf, it, reducerRecordhandler); - reducerRecordhandler.init(jobConf, result, Reporter.NULL); + reducerRecordhandler.init(jobConf, result, sparkReporter); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index 39af1d1..505ba35 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounter; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounterGroup; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener; import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; @@ -166,9 +169,12 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { return 5; } + SparkCounters sparkCounters = new SparkCounters(sc, hiveConf); + SparkReporter sparkReporter = new SparkReporter(sparkCounters); + // Generate Spark plan - SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, - emptyScratchDir); + SparkPlanGenerator gen = + new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter); SparkPlan plan; try { plan = gen.generate(sparkWork); @@ -192,6 +198,7 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { LOG.error("Error executing Spark Plan", e); return 1; } + return 0; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 20ea977..2ed979a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -96,6 +96,7 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { execContext.setLocalWork(localWork); MapredContext.init(true, new JobConf(jc)); + MapredContext.get().setReporter(reporter); mo.setExecContext(execContext); mo.initializeLocalWork(jc); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 126cb9f..c6a8773 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; @@ -64,13 +65,15 @@ private final JobConf jobConf; private Context context; private Path scratchDir; + private SparkReporter sparkReporter; public SparkPlanGenerator(JavaSparkContext sc, Context context, - JobConf jobConf, Path scratchDir) { + JobConf jobConf, Path scratchDir, SparkReporter sparkReporter) { this.sc = sc; this.context = context; this.jobConf = jobConf; this.scratchDir = scratchDir; + this.sparkReporter = sparkReporter; } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -169,7 +172,7 @@ private MapTran generate(MapWork mw) throws Exception { MapTran result = new MapTran(); JobConf newJobConf = cloneJobConf(mw); byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); - HiveMapFunction mapFunc = new HiveMapFunction(confBytes); + HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter); result.setMapFunction(mapFunc); return result; } @@ -178,7 +181,7 @@ private ReduceTran generate(ReduceWork rw) throws Exception { ReduceTran result = new ReduceTran(); JobConf newJobConf = cloneJobConf(rw); byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); - HiveReduceFunction redFunc = new HiveReduceFunction(confBytes); + HiveReduceFunction redFunc = new HiveReduceFunction(confBytes, sparkReporter); result.setReduceFunction(redFunc); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java new file mode 100644 index 0000000..7ea7577 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; + +import java.io.Serializable; + +/** + * Implement SparkReporter for Hive operator level statistics collection, and throw + * UnsupportedOperationException for other unrelated methods, so if any Hive feature + * depends on these unimplemented methods, we could go back here quickly and enable it. + */ +public class SparkReporter implements Reporter, Serializable { + + private SparkCounters sparkCounters; + + public SparkReporter(SparkCounters sparkCounters) { + this.sparkCounters = sparkCounters; + } + + @Override + public void setStatus(String status) { + throw new UnsupportedOperationException("do not support this method now."); + } + + @Override + public Counter getCounter(Enum name) { + throw new UnsupportedOperationException("do not support this method now."); + } + + @Override + public Counter getCounter(String group, String name) { + throw new UnsupportedOperationException("do not support this method now."); + } + + @Override + public void incrCounter(Enum key, long amount) { + sparkCounters.increment(key.getDeclaringClass().getName(), key.name(), amount); + } + + @Override + public void incrCounter(String group, String counter, long amount) { + sparkCounters.increment(group, counter, amount); + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + throw new UnsupportedOperationException("do not support this method now."); + } + + @Override + public float getProgress() { + throw new UnsupportedOperationException("do not support this method now."); + } + + @Override + public void progress() { + //do not support task level progress, do nothing here. + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java index 3c7eb99..b5d26ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java @@ -23,8 +23,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ScriptOperator; import org.apache.hadoop.mapreduce.util.ResourceBundles; import org.apache.spark.api.java.JavaSparkContext; @@ -61,6 +63,10 @@ private void initializeSparkCounters() { createCounter(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP), Operator.HIVECOUNTERCREATEDFILES); createCounter(MapOperator.Counter.DESERIALIZE_ERRORS); + createCounter(FilterOperator.Counter.FILTERED); + createCounter(FilterOperator.Counter.PASSED); + createCounter(ScriptOperator.Counter.DESERIALIZE_ERRORS); + createCounter(ScriptOperator.Counter.SERIALIZE_ERRORS); } public void createCounter(Enum key) {