diff --git common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java index 7badcdb..c6a1cbd 100644 --- common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java +++ common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java @@ -47,6 +47,9 @@ public String getAggregator(Configuration conf) { public String getPublisher(Configuration conf) { return "org.apache.hadoop.hive.ql.stats.CounterStatsPublisher"; } public String getAggregator(Configuration conf) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ)) { + return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregatorTez"; + } return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregator"; } }, custom { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 6ba2e2d..044bb0f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -21,9 +21,11 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.security.auth.login.LoginException; @@ -48,12 +50,14 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezSession; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.StatusGetOpts; /** * @@ -67,10 +71,16 @@ private static final String CLASS_NAME = TezTask.class.getName(); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + private TezCounters counters; + public TezTask() { super(); } + public TezCounters getTezCounters() { + return counters; + } + @Override public int execute(DriverContext driverContext) { int rc = 1; @@ -126,6 +136,10 @@ public int execute(DriverContext driverContext) { TezJobMonitor monitor = new TezJobMonitor(); rc = monitor.monitorExecution(client); + // fetch the counters + Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); + counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); + } catch (Exception e) { LOG.error("Failed to execute tez graph.", e); // rc will be 1 at this point indicating failure. diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java new file mode 100644 index 0000000..5e703ec --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.tez.common.counters.TezCounters; + +public class CounterStatsAggregatorTez implements StatsAggregator { + + private static final Log LOG = LogFactory.getLog(CounterStatsAggregatorTez.class.getName()); + + private TezCounters counters; + + @Override + public boolean connect(Configuration hconf, Task sourceTask) { + counters = ((TezTask) sourceTask).getTezCounters(); + return counters != null; + } + + @Override + public String aggregateStats(String keyPrefix, String statType) { + long value = 0; + for (String groupName : counters.getGroupNames()) { + if (groupName.startsWith(keyPrefix)) { + value += counters.getGroup(groupName).findCounter(statType).getValue(); + } + } + return String.valueOf(value); + } + + @Override + public boolean closeConnection() { + return true; + } + + @Override + public boolean cleanUp(String keyPrefix) { + return true; + } +}