Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1171283)
+++ conf/hive-default.xml (working copy)
@@ -551,6 +551,18 @@
+ hive.client.stats.publishers
+
+ Comma-separated list of statistics publishers to be invoked on counters on each job. A client stats publisher is specified as the name of a Java class which implements the org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface.
+
+
+
+ hive.client.stats.counters
+
+ Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used
+
+
+
hive.merge.mapfiles
true
Merge small files at the end of a map-only job
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1171283)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -123,6 +123,7 @@
PREEXECHOOKS("hive.exec.pre.hooks", ""),
POSTEXECHOOKS("hive.exec.post.hooks", ""),
ONFAILUREHOOKS("hive.exec.failure.hooks", ""),
+ CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""),
EXECPARALLEL("hive.exec.parallel", false), // parallel query launching
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8),
HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true),
@@ -410,8 +411,9 @@
3000), // # milliseconds to wait before the next retry
HIVE_STATS_COLLECT_RAWDATASIZE("hive.stats.collect.rawdatasize", true),
// should the raw data size be collected when analayzing tables
+ CLIENT_STATS_COUNTERS("hive.client.stats.counters", ""),
+ //Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used".
-
// Concurrency
HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false),
HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"),
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1171283)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy)
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
@@ -30,6 +31,9 @@
import java.util.Set;
import java.util.Enumeration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
@@ -38,6 +42,7 @@
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
@@ -54,6 +59,8 @@
public class HadoopJobExecHelper {
+ static final private Log LOG = LogFactory.getLog(HadoopJobExecHelper.class.getName());
+
protected transient JobConf job;
protected Task extends Serializable> task;
@@ -225,6 +232,7 @@
long cpuMsec = -1;
int numMap = -1;
int numReduce = -1;
+ List clientStatPublishers = getClientStatPublishers();
while (!rj.isComplete()) {
try {
@@ -363,6 +371,14 @@
}
}
+ //Prepare data for Client Stat Publishers (if any present) and execute them
+ if (clientStatPublishers.size() > 0){
+ Map exctractedCounters = extractAllCounterValues(ctrs);
+ for(ClientStatsPublisher clientStatPublisher : clientStatPublishers){
+ clientStatPublisher.run(exctractedCounters, rj.getID().toString());
+ }
+ }
+
Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
"CPU_MILLISECONDS");
if (counterCpuMsec != null) {
@@ -704,6 +720,7 @@
if (SessionState.get() != null) {
SessionState.get().getLastMapRedStatsList().add(mapRedStats);
}
+
boolean success = mapRedStats.isSuccess();
String statusMesg = getJobEndMsg(rj.getJobID());
@@ -728,4 +745,40 @@
return returnVal;
}
+
+ private Map extractAllCounterValues(Counters counters) {
+ Map exctractedCounters = new HashMap();
+ for (Counters.Group cg : counters) {
+ for (Counter c : cg) {
+ exctractedCounters.put(cg.getName() + "::" + c.getName(), new Double(c.getCounter()));
+ }
+ }
+ return exctractedCounters;
+ }
+
+ private List getClientStatPublishers() {
+ List clientStatsPublishers = new ArrayList();
+ String confString = HiveConf.getVar(job, HiveConf.ConfVars.CLIENTSTATSPUBLISHERS);
+ confString = confString.trim();
+ if (confString.equals("")) {
+ return clientStatsPublishers;
+ }
+
+ String[] clientStatsPublisherClasses = confString.split(",");
+
+ for (String clientStatsPublisherClass : clientStatsPublisherClasses) {
+ try {
+ clientStatsPublishers.add((ClientStatsPublisher) Class.forName(
+ clientStatsPublisherClass.trim(), true, JavaUtils.getClassLoader()).newInstance());
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn(e.getClass().getName() + " occured when trying to create class: "
+ + clientStatsPublisherClass.trim() + " implementing ClientStatsPublisher interface");
+ LOG.warn("The exception message is: " + e.getMessage());
+ LOG.warn("Program will continue, but without this ClientStatsPublisher working");
+ }
+ }
+ return clientStatsPublishers;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java (revision 0)
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.util.Map;
+
+public interface ClientStatsPublisher {
+
+ public void run(Map counterValues, String jobID);
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 1171283)
+++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (working copy)
@@ -299,7 +299,7 @@
}
/**
- * Called at the start of job Driver.run().
+ * Called at the start of job Driver.execute().
*/
public void startQuery(String cmd, String id) {
SessionState ss = SessionState.get();