diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java new file mode 100644 index 0000000..cdf2e10 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.counter; + +import java.io.Serializable; + +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.api.java.JavaSparkContext; + +public class SparkCounter implements Serializable { + + private String name; + private String displayName; + private Accumulator accumulator; + + public SparkCounter( + String name, + String displayName, + String groupName, + long initValue, + JavaSparkContext sparkContext) { + + this.name = name; + this.displayName = displayName; + LongAccumulatorParam longParam = new LongAccumulatorParam(); + String accumulatorName = groupName + "_" + name; + this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longParam); + } + + public long getValue() { + return accumulator.value(); + } + + public void increment(long incr) { + accumulator.add(incr); + } + + public String getName() { + return name; + } + + public String getDisplayName() { + return displayName; + } + + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + + class LongAccumulatorParam implements AccumulatorParam { + + @Override + public Long addAccumulator(Long t1, Long t2) { + return t1 + t2; + } + + @Override + public Long addInPlace(Long r1, Long r2) { + return r1 + r2; + } + + @Override + public Long zero(Long initialValue) { + return 0L; + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java new file mode 100644 index 0000000..49a0978 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.counter; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.mapreduce.util.ResourceBundles; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * We use group to fold all the same kind of counters. + */ +public class SparkCounterGroup implements Serializable { + + private String groupName; + private String groupDisplayName; + private Map sparkCounters; + + private transient JavaSparkContext javaSparkContext; + + public SparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + + this.groupName = groupName; + this.groupDisplayName = groupDisplayName; + this.javaSparkContext = javaSparkContext; + sparkCounters = new HashMap(); + } + + public void createCounter(String name, long initValue) { + String displayName = ResourceBundles.getCounterGroupName(name, name); + SparkCounter counter = new SparkCounter(name, displayName, groupName, initValue, javaSparkContext); + sparkCounters.put(name, counter); + } + + public SparkCounter getCounter(String name) { + return sparkCounters.get(name); + } + + public String getGroupName() { + return groupName; + } + + public String getGroupDisplayName() { + return groupDisplayName; + } + + public void setGroupDisplayName(String groupDisplayName) { + this.groupDisplayName = groupDisplayName; + } + + public Map getSparkCounters() { + return sparkCounters; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java new file mode 100644 index 0000000..dc315b2 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.counter; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.mapreduce.util.ResourceBundles; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * SparkCounters is used to collect Hive operator metric through Spark accumulator. There are few + * limitation of Spark accumulator, like: + * 1. accumulator should be created at Spark context side. + * 2. Spark tasks can only increment metric count. + * 3. User can only get accumulator value at Spark context side. + * These Spark Counter API is designed to fit into Hive requirement, while with several access + * restriction due to Spark accumulator previous mentioned: + * 1. Counter should be created on driver side if it would be accessed in task. + * 2. increment could only be invoked task side. + * 3. Hive could only get Counter value at driver side. + */ +public class SparkCounters implements Serializable { + + private Map sparkCounterGroups; + + private JavaSparkContext javaSparkContext; + private Configuration hiveConf; + + public SparkCounters(JavaSparkContext javaSparkContext, Configuration hiveConf) { + this.javaSparkContext = javaSparkContext; + this.hiveConf = hiveConf; + sparkCounterGroups = new HashMap(); + initializeSparkCounters(); + } + + /** + * pre-define all needed Counters here. + */ + private void initializeSparkCounters() { + createCounter(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP), + Operator.HIVECOUNTERCREATEDFILES); + createCounter(MapOperator.Counter.DESERIALIZE_ERRORS); + } + + public void createCounter(Enum key) { + createCounter(key.getDeclaringClass().getName(), key.name()); + } + + public void createCounter(String groupName, String counterName) { + createCounter(groupName, counterName, 0L); + } + + public void createCounter(String groupName, String counterName, long initValue) { + getGroup(groupName).createCounter(counterName, initValue); + } + + public void increment(Enum key, long incrValue) { + increment(key.getDeclaringClass().getName(), key.name(), incrValue); + } + + public void increment(String groupName, String counterName, long value) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + throw new RuntimeException( + String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + } + counter.increment(value); + } + + public long getValue(String groupName, String counterName) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + throw new RuntimeException( + String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + } + + return counter.getValue(); + } + + public SparkCounter getCounter(String groupName, String counterName) { + return getGroup(groupName).getCounter(counterName); + } + + public SparkCounter getCounter(Enum key) { + return getCounter(key.getDeclaringClass().getName(), key.name()); + } + + private SparkCounterGroup getGroup(String groupName) { + SparkCounterGroup group = sparkCounterGroups.get(groupName); + if (group == null) { + String groupDisplayName = ResourceBundles.getCounterGroupName(groupName, groupName); + group = new SparkCounterGroup(groupName, groupDisplayName, javaSparkContext); + sparkCounterGroups.put(groupName, group); + } + return group; + } + + public Map getSparkCounterGroups() { + return sparkCounterGroups; + } +}