diff --git conf/hive-default.xml.template conf/hive-default.xml.template index c574ab5..73f7c88 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1786,6 +1786,26 @@ + hive.fetch.task.conversion.threshold + -1 + + Input threshold for applying hive.fetch.task.conversion. If target table is native, input length + is calculated by summation of file lengths. If it's not native, storage handler for the table + can optionally implement org.apache.hadoop.hive.ql.metadata.InputEstimator interface. + + + + + hive.fetch.task.aggr + false + + Aggregation queries with no group-by clause (for example, select count(*) from src) executes + final aggregations in single reduce task. If this is set true, hive delegates final aggregation + stage to fetch task, possibly decreasing the query time. + + + + hive.cache.expr.evaluation true @@ -1797,17 +1817,6 @@ - - - hive.fetch.task.conversion.threshold - -1 - - Input threshold for applying hive.fetch.task.conversion. If target table is native, input length - is calculated by summation of file lengths. If it's not native, storage handler for the table - can optionally implement org.apache.hadoop.hive.ql.metadata.InputEstimator interface. - - - hive.hmshandler.retry.attempts 1 diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java index 476af4b..58a64c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; // execute final aggregation stage for simple fetch query on fetch task public class SimpleFetchAggregation implements Transform { @@ -98,6 +99,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, TableDesc tsDesc = createIntermediateFS(pGBY, fileName); for (AggregationDesc aggregation : cGBY.getConf().getAggregators()) { + GenericUDAFEvaluator evaluator = aggregation.getGenericUDAFEvaluator(); + if (!evaluator.makesCompactResult()) { + return null; + } List parameters = aggregation.getParameters(); aggregation.setParameters(ExprNodeDescUtils.backtrack(parameters, cGBY, pGBY)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java index 5668a3b..416ea5d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java @@ -20,6 +20,8 @@ import java.io.Closeable; import java.io.IOException; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -41,8 +43,10 @@ @UDFType(deterministic = true) public abstract class GenericUDAFEvaluator implements Closeable { + @Retention(RetentionPolicy.RUNTIME) public static @interface AggregationType { boolean estimable() default false; + boolean compact() default true; } public static boolean isEstimable(AggregationBuffer buffer) { @@ -55,6 +59,22 @@ public static boolean isEstimable(AggregationBuffer buffer) { } /** + * Generally, UDAF makes much smaller output from input. The fetch aggregation optimizer exploits + * this traits to make final aggregation work done in fetch task rather than in Reducetask. + * If it's not the case (making a map for whole group, etc.), UDAF implementor should annotate + * the aggregation buffer with AggregationType(compact=false). + */ + public boolean makesCompactResult() { + try { + AggregationBuffer buffer = getNewAggregationBuffer(); + AggregationType annotation = buffer.getClass().getAnnotation(AggregationType.class); + return annotation == null || annotation.compact(); + } catch (HiveException e) { + return false; // should not happen + } + } + + /** * Mode. * */