Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1171283) +++ conf/hive-default.xml (working copy) @@ -551,6 +551,18 @@ + hive.client.stats.publishers + + Comma-separated list of statistics publishers to be invoked on counters on each job. A client stats publisher is specified as the name of a Java class which implements the org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface. + + + + hive.client.stats.counters + + Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used + + + hive.merge.mapfiles true Merge small files at the end of a map-only job Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1171283) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -123,6 +123,7 @@ PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), + CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true), @@ -410,8 +411,9 @@ 3000), // # milliseconds to wait before the next retry HIVE_STATS_COLLECT_RAWDATASIZE("hive.stats.collect.rawdatasize", true), // should the raw data size be collected when analayzing tables + CLIENT_STATS_COUNTERS("hive.client.stats.counters", ""), + //Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used". - // Concurrency HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false), HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"), Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1171283) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; import java.util.HashMap; @@ -30,6 +31,9 @@ import java.util.Set; import java.util.Enumeration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; @@ -38,6 +42,7 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.JobClient; @@ -54,6 +59,8 @@ public class HadoopJobExecHelper { + static final private Log LOG = LogFactory.getLog(HadoopJobExecHelper.class.getName()); + protected transient JobConf job; protected Task task; @@ -225,6 +232,7 @@ long cpuMsec = -1; int numMap = -1; int numReduce = -1; + List clientStatPublishers = getClientStatPublishers(); while (!rj.isComplete()) { try { @@ -363,6 +371,14 @@ } } + //Prepare data for Client Stat Publishers (if any present) and execute them + if (clientStatPublishers.size() > 0){ + Map exctractedCounters = extractAllCounterValues(ctrs); + for(ClientStatsPublisher clientStatPublisher : clientStatPublishers){ + clientStatPublisher.run(exctractedCounters, rj.getID().toString()); + } + } + Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "CPU_MILLISECONDS"); if (counterCpuMsec != null) { @@ -704,6 +720,7 @@ if (SessionState.get() != null) { SessionState.get().getLastMapRedStatsList().add(mapRedStats); } + boolean success = mapRedStats.isSuccess(); String statusMesg = getJobEndMsg(rj.getJobID()); @@ -728,4 +745,40 @@ return returnVal; } + + private Map extractAllCounterValues(Counters counters) { + Map exctractedCounters = new HashMap(); + for (Counters.Group cg : counters) { + for (Counter c : cg) { + exctractedCounters.put(cg.getName() + "::" + c.getName(), new Double(c.getCounter())); + } + } + return exctractedCounters; + } + + private List getClientStatPublishers() { + List clientStatsPublishers = new ArrayList(); + String confString = HiveConf.getVar(job, HiveConf.ConfVars.CLIENTSTATSPUBLISHERS); + confString = confString.trim(); + if (confString.equals("")) { + return clientStatsPublishers; + } + + String[] clientStatsPublisherClasses = confString.split(","); + + for (String clientStatsPublisherClass : clientStatsPublisherClasses) { + try { + clientStatsPublishers.add((ClientStatsPublisher) Class.forName( + clientStatsPublisherClass.trim(), true, JavaUtils.getClassLoader()).newInstance()); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + LOG.warn(e.getClass().getName() + " occured when trying to create class: " + + clientStatsPublisherClass.trim() + " implementing ClientStatsPublisher interface"); + LOG.warn("The exception message is: " + e.getMessage()); + LOG.warn("Program will continue, but without this ClientStatsPublisher working"); + } + } + return clientStatsPublishers; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java (revision 0) @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats; + +import java.util.Map; + +public interface ClientStatsPublisher { + + public void run(Map counterValues, String jobID); + +} Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 1171283) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (working copy) @@ -299,7 +299,7 @@ } /** - * Called at the start of job Driver.run(). + * Called at the start of job Driver.execute(). */ public void startQuery(String cmd, String id) { SessionState ss = SessionState.get();