diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java index 33341ad..d4c127b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java @@ -46,7 +46,7 @@ @JsonProperty("poolName") private final String poolName; @JsonProperty("clusterPercent") - private final double clusterPercent; + private double clusterPercent; WmTezSessionInfo(WmTezSession wmTezSession) { this.poolName = wmTezSession.getPoolName(); @@ -66,6 +66,10 @@ public double getClusterPercent() { return clusterPercent; } + public void updateClusterFraction(final double clusterFraction) { + clusterPercent = clusterFraction * 100; + } + @Override public String toString() { return "SessionId: " + sessionId + " Pool: " + poolName + " Cluster %: " + clusterPercent; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index d4c3ab9..51deea3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -18,21 +18,22 @@ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - -import java.util.concurrent.Future; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; + import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hive.common.util.Ref; -import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.annotate.JsonSerialize; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + @JsonSerialize public class WmTezSession extends TezSessionPoolSession implements AmPluginNode { @JsonProperty("poolName") @@ -150,6 +151,22 @@ public String getPoolName() { void setClusterFraction(double fraction) { this.clusterFraction = fraction; + refreshClusterFractionInWmEvents(); + } + + private void refreshClusterFractionInWmEvents() { + WmContext wmContext = getWmContext(); + if (wmContext != null) { + List events = wmContext.getQueryWmEvents(); + if (events != null && !events.isEmpty()) { + for (WmEvent wmEvent : events) { + if (wmEvent.getWmTezSessionInfo().getSessionId().equals(getSessionId()) && + wmEvent.getWmTezSessionInfo().getSessionId().equals(poolName)) { + wmEvent.getWmTezSessionInfo().updateClusterFraction(clusterFraction); + } + } + } + } } void clearWm() { @@ -199,7 +216,9 @@ boolean setFailedToSendGuaranteed() { } public void handleUpdateError(int endpointVersion) { - wmParent.addUpdateError(this, endpointVersion); + if (wmParent != null) { + wmParent.addUpdateError(this, endpointVersion); + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java index 7a7ef50..6cb55f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java @@ -72,7 +72,7 @@ public WmContext(final long queryStartTime, final String queryId) { this.queryCompleted = false; } - public Set getAppliedTriggers() { + private Set getAppliedTriggers() { return appliedTriggers; } @@ -139,7 +139,7 @@ public long getQueryEndTime() { return queryEndTime; } - List getQueryWmEvents() { + public List getQueryWmEvents() { return queryWmEvents; }