Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1170867) +++ conf/hive-default.xml (working copy) @@ -551,6 +551,18 @@ + hive.client.stats.publishers + + Comma-separated list of statistics publishers to be invoked on counters on each job. A pre-execution hook is specified as the name of a Java class which implements the org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface. + + + + hive.client.stats.counters + + Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Display names should be used + + + hive.merge.mapfiles true Merge small files at the end of a map-only job Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1170867) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -123,6 +123,7 @@ PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), + CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true), @@ -410,8 +411,9 @@ 3000), // # milliseconds to wait before the next retry HIVE_STATS_COLLECT_RAWDATASIZE("hive.stats.collect.rawdatasize", true), // should the raw data size be collected when analayzing tables + CLIENT_STATS_COUNTERS("hive.client.stats.counters", ""), + //Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Display names should be used". - // Concurrency HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false), HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"), Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1170867) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -21,15 +21,19 @@ import java.io.IOException; import java.io.Serializable; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Enumeration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; @@ -38,22 +42,23 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskReport; -import org.apache.hadoop.mapred.Counters.Counter; import org.apache.log4j.Appender; -import org.apache.log4j.BasicConfigurator; import org.apache.log4j.FileAppender; import org.apache.log4j.LogManager; -import org.apache.log4j.PropertyConfigurator; public class HadoopJobExecHelper { + static final private Log LOG = LogFactory.getLog(HadoopJobExecHelper.class.getName()); + protected transient JobConf job; protected Task task; @@ -363,6 +368,14 @@ } } + //Get all ClientStatsPublishers, prepare data for them (if any publishers present) and execute them + if (getClientStatPublishers().size() > -1){ + Map exctractedCounters = extractAllCounterValues(ctrs); + for(ClientStatsPublisher clientStatPublisher : getClientStatPublishers()){ + clientStatPublisher.run(exctractedCounters, rj.getID()); + } + } + Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "CPU_MILLISECONDS"); if (counterCpuMsec != null) { @@ -704,6 +717,7 @@ if (SessionState.get() != null) { SessionState.get().getLastMapRedStatsList().add(mapRedStats); } + boolean success = mapRedStats.isSuccess(); String statusMesg = getJobEndMsg(rj.getJobID()); @@ -728,4 +742,38 @@ return returnVal; } + + private Map extractAllCounterValues(Counters counters) { + Map exctractedCounters = new HashMap(); + for (Counters.Group cg : counters){ + for(Counter c : cg){ + exctractedCounters.put(cg.getDisplayName() + "::" + c.getDisplayName(), new Double(c.getCounter())); + } + } + return exctractedCounters; + } + + private List getClientStatPublishers(){ + List clientStatsPublishers = new ArrayList(); + String confString = HiveConf.getVar(job, HiveConf.ConfVars.CLIENTSTATSPUBLISHERS); + confString = confString.trim(); + if (confString.equals("")) { + return clientStatsPublishers; + } + + String[] clientStatsPublisherClasses = confString.split(","); + + for (String clientStatsPublisherClass : clientStatsPublisherClasses) { + try { + clientStatsPublishers.add((ClientStatsPublisher) Class.forName(clientStatsPublisherClass.trim(), true, JavaUtils.getClassLoader()).newInstance()); + } catch (RuntimeException e){ + throw e; + }catch (Exception e) { + LOG.warn(e.getClass().getName() + " occured when trying to create class: " + clientStatsPublisherClass.trim() + " implementing ClientStatsPublisher interface"); + LOG.warn("The exception message is: " + e.getMessage()); + LOG.warn("Program will continue, but without this ClientStatsPublisher working"); + } + } + return clientStatsPublishers; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java (revision 0) @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats; + +import java.util.Map; + +import org.apache.hadoop.mapred.JobID; + +public interface ClientStatsPublisher { + + public void run(Map keysAndValues, JobID jobID); + +} Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 1170867) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (working copy) @@ -299,7 +299,7 @@ } /** - * Called at the start of job Driver.run(). + * Called at the start of job Driver.execute(). */ public void startQuery(String cmd, String id) { SessionState ss = SessionState.get();