diff --git common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java index 2118723..efc2c6b 100644 --- common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java +++ common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java @@ -37,6 +37,11 @@ */ public static final String JDBC_IMPL_CLASS_VAL = "jdbc"; + /** + * The value of the user variable "hive.stats.dbclass" to use Hadoop Counter implementation. + */ + public static final String COUNTER_IMPL_CLASS_VAL = "counter"; + // statistics stored in metastore /** * The name of the statistic Num Files to be published or gathered. diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 32ab3d8..b21f35e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -603,8 +603,8 @@ // Statistics HIVESTATSAUTOGATHER("hive.stats.autogather", true), - HIVESTATSDBCLASS("hive.stats.dbclass", - "jdbc:derby"), // other options are jdbc:mysql and hbase as defined in StatsSetupConst.java + HIVESTATSDBCLASS("hive.stats.dbclass", "jdbc:derby", + new PatternValidator("jdbc(:.*)", "hbase", "counter")), // defined in StatsSetupConst.java HIVESTATSJDBCDRIVER("hive.stats.jdbcdriver", "org.apache.derby.jdbc.EmbeddedDriver"), // JDBC driver specific to the dbclass HIVESTATSDBCONNECTIONSTRING("hive.stats.dbconnectionstring", @@ -1359,6 +1359,27 @@ public String validate(String value) { } } + public static class PatternValidator implements Validator { + private final List expected = new ArrayList(); + private PatternValidator(String... values) { + for (String value : values) { + expected.add(Pattern.compile(value)); + } + } + @Override + public String validate(String value) { + if (value == null) { + return "Invalid value.. expects one of patterns " + expected; + } + for (Pattern pattern : expected) { + if (pattern.matcher(value).matches()) { + return null; + } + } + return "Invalid value.. expects one of patterns " + expected; + } + } + public static class RatioValidator implements Validator { @Override public String validate(String value) { diff --git conf/hive-default.xml.template conf/hive-default.xml.template index c574ab5..5e3b761 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1203,8 +1203,8 @@ hive.stats.dbclass - jdbc:derby - The default database that stores temporary hive statistics. + counter + The default storage that stores temporary hive statistics. Currently, jdbc, hbase and counter type is supported diff --git data/conf/hive-site.xml data/conf/hive-site.xml index 8aefb3b..cf8ed15 100644 --- data/conf/hive-site.xml +++ data/conf/hive-site.xml @@ -186,4 +186,9 @@ The default SerDe hive will use for the rcfile format + + hive.stats.dbclass + jdbc:derby + The default storatge that stores temporary hive statistics. Currently, jdbc, hbase and counter type is supported + diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java index abde695..a9c3136 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.stats.StatsAggregator; @@ -46,7 +47,7 @@ /** * Does the necessary HBase initializations. */ - public boolean connect(Configuration hiveconf) { + public boolean connect(Configuration hiveconf, MapRedTask sourceTask) { try { htable = new HTable(HBaseConfiguration.create(hiveconf), diff --git itests/qtest/pom.xml itests/qtest/pom.xml index a453d8a..6c3a53a 100644 --- itests/qtest/pom.xml +++ itests/qtest/pom.xml @@ -36,7 +36,7 @@ false false - list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q + list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java index fafd68b..8fa5c3e 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.stats; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.session.SessionState; /** @@ -29,7 +30,7 @@ public class KeyVerifyingStatsAggregator implements StatsAggregator { - public boolean connect(Configuration hconf) { + public boolean connect(Configuration hconf, MapRedTask sourceTask) { return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index b9456e8..42e8dab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -28,9 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; @@ -192,7 +190,7 @@ private int aggregateStats() { } statsAggregator = StatsFactory.getStatsAggregator(); // manufacture a StatsAggregator - if (!statsAggregator.connect(conf)) { + if (!statsAggregator.connect(conf, getWork().getSourceTask())) { throw new HiveException("StatsAggregator connect failed " + statsImplementationClass); } } @@ -445,19 +443,4 @@ private void updateStats(String[] statsList, Statistics stats, } return list; } - - /** - * This method is static as it is called from the shutdown hook at the ExecDriver. - */ - public static void cleanUp(String jobID, Configuration config) { - StatsAggregator statsAggregator; - String statsImplementationClass = HiveConf.getVar(config, HiveConf.ConfVars.HIVESTATSDBCLASS); - StatsFactory.setImplementation(statsImplementationClass, config); - statsAggregator = StatsFactory.getStatsAggregator(); - if (statsAggregator.connect(config)) { - statsAggregator.cleanUp(jobID + Path.SEPARATOR); // Adding the path separator to avoid an Id - // being a prefix of another ID - statsAggregator.closeConnection(); - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index 686a380..cca8481 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; import org.apache.hadoop.hive.ql.lib.Node; @@ -221,7 +222,7 @@ private void processLinkedFileDesc(GenMRProcContext ctx, private void addStatsTask(FileSinkOperator nd, MoveTask mvTask, Task currTask, HiveConf hconf) { - MoveWork mvWork = ((MoveTask) mvTask).getWork(); + MoveWork mvWork = mvTask.getWork(); StatsWork statsWork = null; if (mvWork.getLoadTableWork() != null) { statsWork = new StatsWork(mvWork.getLoadTableWork()); @@ -229,12 +230,14 @@ private void addStatsTask(FileSinkOperator nd, MoveTask mvTask, statsWork = new StatsWork(mvWork.getLoadFileWork()); } assert statsWork != null : "Error when genereting StatsTask"; + + statsWork.setSourceTask((MapRedTask)currTask); statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); MapredWork mrWork = (MapredWork) currTask.getWork(); // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix // in FileSinkDesc is used for stats publishing. They should be consistent. - statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix()); + statsWork.setAggKey(nd.getConf().getStatsAggPrefix()); Task statsTask = TaskFactory.get(statsWork, hconf); // mark the MapredWork and FileSinkOperator for gathering stats diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java index 51fceed..0dd0b03 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java @@ -20,6 +20,7 @@ import java.io.Serializable; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec; /** @@ -49,6 +50,8 @@ private boolean isPartialScanAnalyzeCommand = false; + private transient MapRedTask sourceTask; + public StatsWork() { } @@ -140,4 +143,12 @@ public boolean isPartialScanAnalyzeCommand() { public void setPartialScanAnalyzeCommand(boolean isPartialScanAnalyzeCommand) { this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand; } + + public MapRedTask getSourceTask() { + return sourceTask; + } + + public void setSourceTask(MapRedTask sourceTask) { + this.sourceTask = sourceTask; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java new file mode 100644 index 0000000..2cc2519 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RunningJob; + +public class CounterStatsAggregator implements StatsAggregator { + + private static final Log LOG = LogFactory.getLog(CounterStatsAggregator.class.getName()); + + private Counters counters; + private JobClient jc; + + @Override + public boolean connect(Configuration hconf, MapRedTask sourceTask) { + try { + jc = new JobClient(toJobConf(hconf)); + RunningJob job = jc.getJob(sourceTask.getJobID()); + if (job != null) { + counters = job.getCounters(); + } + } catch (Exception e) { + LOG.error("Failed to get Job instance for " + sourceTask.getJobID()); + } + return counters != null; + } + + private JobConf toJobConf(Configuration hconf) { + return hconf instanceof JobConf ? (JobConf)hconf : new JobConf(hconf, ExecDriver.class); + } + + @Override + public String aggregateStats(String keyPrefix, String statType) { + long value = 0; + for (String groupName : counters.getGroupNames()) { + if (groupName.startsWith(keyPrefix)) { + value += counters.getGroup(groupName).getCounter(statType); + } + } + return String.valueOf(value); + } + + @Override + public boolean closeConnection() { + try { + jc.close(); + } catch (IOException e) { + LOG.error("Error closing job client for stats aggregator.", e); + } + return true; + } + + @Override + public boolean cleanUp(String keyPrefix) { + return true; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java new file mode 100644 index 0000000..f59e60a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.mapred.Reporter; + +public class CounterStatsPublisher implements StatsPublisher { + + private static final Log LOG = LogFactory.getLog(CounterStatsPublisher.class.getName()); + + private Reporter reporter; + + @Override + public boolean init(Configuration hconf) { + return true; + } + + public boolean connect(Configuration hconf) { + MapredContext context = MapredContext.get(); + if (context == null || context.getReporter() == null) { + return false; + } + reporter = context.getReporter(); + return true; + } + + @Override + public boolean publishStat(String fileID, Map stats) { + for (Map.Entry entry : stats.entrySet()) { + try { + reporter.incrCounter(fileID, entry.getKey(), Long.valueOf(entry.getValue())); + } catch (NumberFormatException e) { + LOG.error("Invalid counter value " + entry.getValue() + " for " + entry.getKey()); + } + } + return true; + } + @Override + public boolean closeConnection() { + return true; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java index 563a1ba..661d648 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.stats; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; /** * An interface for any possible implementation for gathering statistics. @@ -31,9 +32,10 @@ * * @param hconf * HiveConf that contains the connection parameters. + * @param sourceTask * @return true if connection is successful, false otherwise. */ - public boolean connect(Configuration hconf); + public boolean connect(Configuration hconf, MapRedTask sourceTask); /** * This method aggregates a given statistic from all tasks (partial stats). diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java index 18290a9..a9d1606 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java @@ -48,51 +48,44 @@ */ public static boolean setImplementation(String configurationParam, Configuration conf) { - ClassLoader classLoader = JavaUtils.getClassLoader(); - if (configurationParam.equals(StatsSetupConst.HBASE_IMPL_CLASS_VAL)) { + jobConf = conf; + if (configurationParam.equalsIgnoreCase(StatsSetupConst.HBASE_IMPL_CLASS_VAL)) { // Case: hbase - try { - publisherImplementation = (Class) - Class.forName("org.apache.hadoop.hive.hbase.HBaseStatsPublisher", true, classLoader); - - aggregatorImplementation = (Class) - Class.forName("org.apache.hadoop.hive.hbase.HBaseStatsAggregator", true, classLoader); - } catch (ClassNotFoundException e) { - LOG.error("HBase Publisher/Aggregator classes cannot be loaded.", e); - return false; - } - } else if (configurationParam.contains(StatsSetupConst.JDBC_IMPL_CLASS_VAL)) { + return instantiate("HBase", + "org.apache.hadoop.hive.hbase.HBaseStatsPublisher", + "org.apache.hadoop.hive.hbase.HBaseStatsAggregator"); + } + if (configurationParam.contains(StatsSetupConst.JDBC_IMPL_CLASS_VAL)) { // Case: jdbc:mysql or jdbc:derby - try { - publisherImplementation = (Class) - Class.forName("org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsPublisher", true, classLoader); - - aggregatorImplementation = (Class) - Class.forName("org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsAggregator", true, classLoader); - } catch (ClassNotFoundException e) { - LOG.error("JDBC Publisher/Aggregator classes cannot be loaded.", e); - return false; - } - } else { - // try default stats publisher/aggregator - String defPublisher = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER); - String defAggregator = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR); - // ERROR no default publisher/aggregator is defined - if (defPublisher == null || defAggregator == null) { - return false; - } - try{ - publisherImplementation = (Class) - Class.forName(defPublisher, true, classLoader); - aggregatorImplementation = (Class) - Class.forName(defAggregator, true, classLoader); - } catch (ClassNotFoundException e) { - LOG.error("JDBC Publisher/Aggregator classes cannot be loaded.", e); - return false; - } + return instantiate("JDBC", + "org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsPublisher", + "org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsAggregator"); + } + if (configurationParam.equalsIgnoreCase(StatsSetupConst.COUNTER_IMPL_CLASS_VAL)) { + return instantiate("COUNTER", + "org.apache.hadoop.hive.ql.stats.CounterStatsPublisher", + "org.apache.hadoop.hive.ql.stats.CounterStatsAggregator"); } + // try default stats publisher/aggregator + String defPublisher = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER); + String defAggregator = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR); + return instantiate("DEFAULT", defPublisher, defAggregator); + } - jobConf = conf; + private static boolean instantiate(String type, String publisher, String aggregator) { + if (publisher == null || aggregator == null) { + return false; + } + ClassLoader classLoader = JavaUtils.getClassLoader(); + try { + publisherImplementation = (Class) + Class.forName(publisher, true, classLoader); + aggregatorImplementation = (Class) + Class.forName(aggregator, true, classLoader); + } catch (ClassNotFoundException e) { + LOG.error(type + " Publisher/Aggregator classes cannot be loaded.", e); + return false; + } return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsSetupConst.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsSetupConst.java deleted file mode 100644 index e69de29..0000000 diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java index 3c29d9d..fb5f50e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; @@ -40,6 +41,7 @@ private Connection conn; private String connectionString; private Configuration hiveconf; + private MapRedTask sourceTask; private final Map columnMapping; private final Log LOG = LogFactory.getLog(this.getClass().getName()); private int timeout = 30; @@ -53,7 +55,7 @@ public JDBCStatsAggregator() { } @Override - public boolean connect(Configuration hiveconf) { + public boolean connect(Configuration hiveconf, MapRedTask sourceTask) { this.hiveconf = hiveconf; timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); @@ -157,7 +159,7 @@ public ResultSet run(PreparedStatement stmt) throws SQLException { } catch (InterruptedException iex) { } // getting a new connection - if (!connect(hiveconf)) { + if (!connect(hiveconf, sourceTask)) { // if cannot reconnect, just fail because connect() already handles retries. LOG.error("Error during publishing aggregation. " + e); return null; @@ -235,7 +237,7 @@ public Void run(PreparedStatement stmt) throws SQLException { } catch (InterruptedException iex) { } // getting a new connection - if (!connect(hiveconf)) { + if (!connect(hiveconf, sourceTask)) { LOG.error("Error during clean-up. " + e); return false; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java index 5cebd34..f783fde 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java @@ -58,7 +58,7 @@ protected void setUp() { protected void tearDown() { StatsAggregator sa = StatsFactory.getStatsAggregator(); assertNotNull(sa); - assertTrue(sa.connect(conf)); + assertTrue(sa.connect(conf, null)); assertTrue(sa.cleanUp("file_0")); assertTrue(sa.closeConnection()); } @@ -84,7 +84,7 @@ public void testStatsPublisherOneStat() throws Throwable { // instantiate stats aggregator StatsAggregator statsAggregator = StatsFactory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf)); + assertTrue(statsAggregator.connect(conf, null)); // publish stats fillStatMap("200", "1000"); @@ -132,7 +132,7 @@ public void testStatsPublisher() throws Throwable { // instantiate stats aggregator StatsAggregator statsAggregator = StatsFactory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf)); + assertTrue(statsAggregator.connect(conf, null)); // statsAggregator.cleanUp("file_0000"); // assertTrue(statsAggregator.connect(conf)); @@ -193,7 +193,7 @@ public void testStatsPublisherMultipleUpdates() throws Throwable { // instantiate stats aggregator StatsAggregator statsAggregator = StatsFactory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf)); + assertTrue(statsAggregator.connect(conf, null)); // publish stats fillStatMap("200", "1000"); @@ -258,7 +258,7 @@ public void testStatsPublisherMultipleUpdatesSubsetStatistics() throws Throwable // instantiate stats aggregator StatsAggregator statsAggregator = StatsFactory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf)); + assertTrue(statsAggregator.connect(conf, null)); // publish stats fillStatMap("200", ""); @@ -329,7 +329,7 @@ public void testStatsAggregatorCleanUp() throws Throwable { // instantiate stats aggregator StatsAggregator statsAggregator = StatsFactory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf)); + assertTrue(statsAggregator.connect(conf, null)); // publish stats fillStatMap("200", "1000"); diff --git ql/src/test/queries/clientpositive/stats_counter.q ql/src/test/queries/clientpositive/stats_counter.q new file mode 100644 index 0000000..20769e4 --- /dev/null +++ ql/src/test/queries/clientpositive/stats_counter.q @@ -0,0 +1,6 @@ +set hive.stats.autogather=true; +set hive.stats.dbclass=counter; + +create table dummy as select * from src; + +desc formatted dummy; diff --git ql/src/test/results/clientpositive/stats_counter.q.out ql/src/test/results/clientpositive/stats_counter.q.out new file mode 100644 index 0000000..f15d8c5 --- /dev/null +++ ql/src/test/results/clientpositive/stats_counter.q.out @@ -0,0 +1,41 @@ +PREHOOK: query: create table dummy as select * from src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +POSTHOOK: query: create table dummy as select * from src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dummy +PREHOOK: query: desc formatted dummy +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted dummy +POSTHOOK: type: DESCTABLE +# col_name data_type comment + +key string None +value string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1