diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java
new file mode 100644
index 0000000..cdf2e10
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.counter;
+
+import java.io.Serializable;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkCounter implements Serializable {
+
+ private String name;
+ private String displayName;
+ private Accumulator accumulator;
+
+ public SparkCounter(
+ String name,
+ String displayName,
+ String groupName,
+ long initValue,
+ JavaSparkContext sparkContext) {
+
+ this.name = name;
+ this.displayName = displayName;
+ LongAccumulatorParam longParam = new LongAccumulatorParam();
+ String accumulatorName = groupName + "_" + name;
+ this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longParam);
+ }
+
+ public long getValue() {
+ return accumulator.value();
+ }
+
+ public void increment(long incr) {
+ accumulator.add(incr);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ class LongAccumulatorParam implements AccumulatorParam {
+
+ @Override
+ public Long addAccumulator(Long t1, Long t2) {
+ return t1 + t2;
+ }
+
+ @Override
+ public Long addInPlace(Long r1, Long r2) {
+ return r1 + r2;
+ }
+
+ @Override
+ public Long zero(Long initialValue) {
+ return 0L;
+ }
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java
new file mode 100644
index 0000000..49a0978
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.counter;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * We use group to fold all the same kind of counters.
+ */
+public class SparkCounterGroup implements Serializable {
+
+ private String groupName;
+ private String groupDisplayName;
+ private Map sparkCounters;
+
+ private transient JavaSparkContext javaSparkContext;
+
+ public SparkCounterGroup(
+ String groupName,
+ String groupDisplayName,
+ JavaSparkContext javaSparkContext) {
+
+ this.groupName = groupName;
+ this.groupDisplayName = groupDisplayName;
+ this.javaSparkContext = javaSparkContext;
+ sparkCounters = new HashMap();
+ }
+
+ public void createCounter(String name, long initValue) {
+ String displayName = ResourceBundles.getCounterGroupName(name, name);
+ SparkCounter counter = new SparkCounter(name, displayName, groupName, initValue, javaSparkContext);
+ sparkCounters.put(name, counter);
+ }
+
+ public SparkCounter getCounter(String name) {
+ return sparkCounters.get(name);
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public String getGroupDisplayName() {
+ return groupDisplayName;
+ }
+
+ public void setGroupDisplayName(String groupDisplayName) {
+ this.groupDisplayName = groupDisplayName;
+ }
+
+ public Map getSparkCounters() {
+ return sparkCounters;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java
new file mode 100644
index 0000000..dc315b2
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.counter;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * SparkCounters is used to collect Hive operator metric through Spark accumulator. There are few
+ * limitation of Spark accumulator, like:
+ * 1. accumulator should be created at Spark context side.
+ * 2. Spark tasks can only increment metric count.
+ * 3. User can only get accumulator value at Spark context side.
+ * These Spark Counter API is designed to fit into Hive requirement, while with several access
+ * restriction due to Spark accumulator previous mentioned:
+ * 1. Counter should be created on driver side if it would be accessed in task.
+ * 2. increment could only be invoked task side.
+ * 3. Hive could only get Counter value at driver side.
+ */
+public class SparkCounters implements Serializable {
+
+ private Map sparkCounterGroups;
+
+ private JavaSparkContext javaSparkContext;
+ private Configuration hiveConf;
+
+ public SparkCounters(JavaSparkContext javaSparkContext, Configuration hiveConf) {
+ this.javaSparkContext = javaSparkContext;
+ this.hiveConf = hiveConf;
+ sparkCounterGroups = new HashMap();
+ initializeSparkCounters();
+ }
+
+ /**
+ * pre-define all needed Counters here.
+ */
+ private void initializeSparkCounters() {
+ createCounter(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP),
+ Operator.HIVECOUNTERCREATEDFILES);
+ createCounter(MapOperator.Counter.DESERIALIZE_ERRORS);
+ }
+
+ public void createCounter(Enum> key) {
+ createCounter(key.getDeclaringClass().getName(), key.name());
+ }
+
+ public void createCounter(String groupName, String counterName) {
+ createCounter(groupName, counterName, 0L);
+ }
+
+ public void createCounter(String groupName, String counterName, long initValue) {
+ getGroup(groupName).createCounter(counterName, initValue);
+ }
+
+ public void increment(Enum> key, long incrValue) {
+ increment(key.getDeclaringClass().getName(), key.name(), incrValue);
+ }
+
+ public void increment(String groupName, String counterName, long value) {
+ SparkCounter counter = getGroup(groupName).getCounter(counterName);
+ if (counter == null) {
+ throw new RuntimeException(
+ String.format("counter[%s, %s] has not initialized before.", groupName, counterName));
+ }
+ counter.increment(value);
+ }
+
+ public long getValue(String groupName, String counterName) {
+ SparkCounter counter = getGroup(groupName).getCounter(counterName);
+ if (counter == null) {
+ throw new RuntimeException(
+ String.format("counter[%s, %s] has not initialized before.", groupName, counterName));
+ }
+
+ return counter.getValue();
+ }
+
+ public SparkCounter getCounter(String groupName, String counterName) {
+ return getGroup(groupName).getCounter(counterName);
+ }
+
+ public SparkCounter getCounter(Enum> key) {
+ return getCounter(key.getDeclaringClass().getName(), key.name());
+ }
+
+ private SparkCounterGroup getGroup(String groupName) {
+ SparkCounterGroup group = sparkCounterGroups.get(groupName);
+ if (group == null) {
+ String groupDisplayName = ResourceBundles.getCounterGroupName(groupName, groupName);
+ group = new SparkCounterGroup(groupName, groupDisplayName, javaSparkContext);
+ sparkCounterGroups.put(groupName, group);
+ }
+ return group;
+ }
+
+ public Map getSparkCounterGroups() {
+ return sparkCounterGroups;
+ }
+}