From e56d65256829123cbcf78c01b843f2f9923806ac Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Wed, 14 Oct 2015 17:23:13 -0700 Subject: [PATCH] HIVE-12164 : Remove jdbc stats collection mechanism --- .../hadoop/hive/ql/stats/DummyStatsAggregator.java | 8 - .../hive/ql/stats/KeyVerifyingStatsAggregator.java | 5 - .../hadoop/hive/ql/exec/FileSinkOperator.java | 24 +- .../org/apache/hadoop/hive/ql/exec/StatsTask.java | 28 +- .../hadoop/hive/ql/exec/TableScanOperator.java | 10 - .../hive/ql/io/rcfile/stats/PartialScanMapper.java | 5 - .../hive/ql/stats/CounterStatsAggregator.java | 7 +- .../hive/ql/stats/CounterStatsAggregatorSpark.java | 7 +- .../hive/ql/stats/CounterStatsAggregatorTez.java | 7 +- .../hive/ql/stats/CounterStatsPublisher.java | 2 +- .../hadoop/hive/ql/stats/StatsAggregator.java | 16 - .../ql/stats/StatsCollectionTaskIndependent.java | 25 -- .../apache/hadoop/hive/ql/stats/StatsFactory.java | 3 +- .../hadoop/hive/ql/stats/fs/FSStatsAggregator.java | 8 +- .../hadoop/hive/ql/stats/fs/FSStatsPublisher.java | 3 +- .../hive/ql/stats/jdbc/JDBCStatsAggregator.java | 262 -------------- .../hive/ql/stats/jdbc/JDBCStatsPublisher.java | 349 ------------------- .../ql/stats/jdbc/JDBCStatsSetupConstants.java | 39 --- .../hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java | 212 ------------ .../hadoop/hive/ql/exec/TestFileSinkOperator.java | 5 - .../hive/ql/exec/TestStatsPublisherEnhanced.java | 384 --------------------- ql/src/test/queries/clientpositive/lb_fs_stats.q | 2 - .../metadata_only_queries_with_filters.q | 1 - ql/src/test/queries/clientpositive/statsfs.q | 2 - 24 files changed, 20 insertions(+), 1394 deletions(-) delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionTaskIndependent.java delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java delete mode 100644 ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java index eb3f6eb..be69025 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java @@ -54,12 +54,4 @@ public boolean closeConnection(StatsCollectionContext scc) { } return true; } - - @Override - public boolean cleanUp(String keyPrefix) { - if (errorMethod.equalsIgnoreCase("cleanUp")) { - return false; - } - return true; - } } diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java index 4e00316..2588e72 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java @@ -47,9 +47,4 @@ public String aggregateStats(String keyPrefix, String statType) { public boolean closeConnection(StatsCollectionContext scc) { return true; } - - @Override - public boolean cleanUp(String keyPrefix) { - return true; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index b7c1267..9da9499 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde2.SerDeException; @@ -1153,7 +1152,6 @@ private void publishStats() throws HiveException { String spSpec = conf.getStaticSpec(); int maxKeyLength = conf.getMaxStatsKeyPrefixLength(); - boolean taskIndependent = statsPublisher instanceof StatsCollectionTaskIndependent; for (Map.Entry entry : valToPaths.entrySet()) { String fspKey = entry.getKey(); // DP/LB @@ -1176,30 +1174,18 @@ private void publishStats() throws HiveException { // split[0] = DP, split[1] = LB String[] split = splitKey(fspKey); String dpSpec = split[0]; - String lbSpec = split[1]; - - String prefix; - String postfix=null; - if (taskIndependent) { - // key = "database.table/SP/DP/"LB/ - // Hive store lowercase table name in metastore, and Counters is character case sensitive, so we - // use lowercase table name as prefix here, as StatsTask get table name from metastore to fetch counter. - prefix = conf.getTableInfo().getTableName().toLowerCase(); - } else { - // key = "prefix/SP/DP/"LB/taskID/ - prefix = conf.getStatsAggPrefix(); - postfix = Utilities.join(lbSpec, taskID); - } + // key = "database.table/SP/DP/"LB/ + // Hive store lowercase table name in metastore, and Counters is character case sensitive, so we + // use lowercase table name as prefix here, as StatsTask get table name from metastore to fetch counter. + String prefix = conf.getTableInfo().getTableName().toLowerCase(); prefix = Utilities.join(prefix, spSpec, dpSpec); prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength); - String key = Utilities.join(prefix, postfix); - Map statsToPublish = new HashMap(); for (String statType : fspValue.stat.getStoredStats()) { statsToPublish.put(statType, Long.toString(fspValue.stat.getStat(statType))); } - if (!statsPublisher.publishStat(key, statsToPublish)) { + if (!statsPublisher.publishStat(prefix, statsToPublish)) { // The original exception is lost. // Not changing the interface to maintain backward compatibility if (isStatsReliable) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 9775645..f71f55d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.stats.StatsAggregator; -import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -158,8 +157,6 @@ private int aggregateStats() { int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf); - // "counter" or "fs" type does not need to collect stats per task - boolean taskIndependent = statsAggregator instanceof StatsCollectionTaskIndependent; if (partitions == null) { org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable(); Map parameters = tTable.getParameters(); @@ -175,7 +172,7 @@ private int aggregateStats() { } if (statsAggregator != null) { - String prefix = getAggregationPrefix(taskIndependent, table, null); + String prefix = getAggregationPrefix(table, null); updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic); } @@ -211,7 +208,7 @@ private int aggregateStats() { } if (statsAggregator != null) { - String prefix = getAggregationPrefix(taskIndependent, table, partn); + String prefix = getAggregationPrefix(table, partn); updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic); } @@ -251,23 +248,15 @@ private int aggregateStats() { return ret; } - private String getAggregationPrefix(boolean counter, Table table, Partition partition) + private String getAggregationPrefix(Table table, Partition partition) throws MetaException { - if (!counter && partition == null) { - return work.getAggKey(); - } - StringBuilder prefix = new StringBuilder(); - if (counter) { - // prefix is of the form dbName.tblName - prefix.append(table.getDbName()).append('.').append(table.getTableName()); - } else { - // In case of a non-partitioned table, the key for stats temporary store is "rootDir" - prefix.append(work.getAggKey()); - } + + // prefix is of the form dbName.tblName + String prefix = table.getDbName()+"."+table.getTableName(); if (partition != null) { - return Utilities.join(prefix.toString(), Warehouse.makePartPath(partition.getSpec())); + return Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec())); } - return prefix.toString(); + return prefix; } private StatsAggregator createStatsAggregator(StatsCollectionContext scc) throws HiveException { @@ -336,7 +325,6 @@ private void updateStats(StatsAggregator statsAggregator, } } } - statsAggregator.cleanUp(aggKey); } private void updateQuickStats(Warehouse wh, Map parameters, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 83b4969..6e4f474 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -20,11 +20,9 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -38,7 +36,6 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -59,7 +56,6 @@ private static final long serialVersionUID = 1L; protected transient JobConf jc; - private transient Configuration hconf; private transient boolean inputFileChanged = false; private TableDesc tableDesc; @@ -207,7 +203,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { return; } - this.hconf = hconf; if (hconf instanceof JobConf) { jc = (JobConf) hconf; } else { @@ -291,7 +286,6 @@ private void publishStats() throws HiveException { return; } - String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf)); Map statsToPublish = new HashMap(); for (String pspecs : stats.keySet()) { @@ -300,10 +294,6 @@ private void publishStats() throws HiveException { int maxKeyLength = conf.getMaxStatsKeyPrefixLength(); String key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength); - if (!(statsPublisher instanceof StatsCollectionTaskIndependent)) { - // stats publisher except counter or fs type needs postfix 'taskID' - key = Utilities.join(prefix, taskID); - } for(String statType : stats.get(pspecs).getStoredStats()) { statsToPublish.put(statType, Long.toString(stats.get(pspecs).getStat(statType))); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java index 2a7e979..d06f502 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; -import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.CombineHiveKey; @@ -157,10 +156,6 @@ private void publishStats() throws HiveException { int maxPrefixLength = StatsFactory.getMaxPrefixLength(jc); // construct key used to store stats in intermediate db String key = Utilities.getHashedStatsPrefix(statsAggKeyPrefix, maxPrefixLength); - if (!(statsPublisher instanceof StatsCollectionTaskIndependent)) { - String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(jc)); - key = Utilities.join(key, taskID); - } // construct statistics to be stored Map statsToPublish = new HashMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java index b9863d9..5440dc3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java @@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; -public class CounterStatsAggregator implements StatsAggregator, StatsCollectionTaskIndependent { +public class CounterStatsAggregator implements StatsAggregator { private static final Log LOG = LogFactory.getLog(CounterStatsAggregator.class.getName()); @@ -79,9 +79,4 @@ public boolean closeConnection(StatsCollectionContext scc) { } return true; } - - @Override - public boolean cleanUp(String keyPrefix) { - return true; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java index 4c01b25..303b75c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java @@ -25,7 +25,7 @@ import org.apache.hive.spark.counter.SparkCounters; public class CounterStatsAggregatorSpark - implements StatsAggregator, StatsCollectionTaskIndependent { + implements StatsAggregator { private static final Log LOG = LogFactory.getLog(CounterStatsAggregatorSpark.class); @@ -55,9 +55,4 @@ public String aggregateStats(String keyPrefix, String statType) { public boolean closeConnection(StatsCollectionContext scc) { return true; } - - @Override - public boolean cleanUp(String keyPrefix) { - return true; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java index 662c106..9a7ad96 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java @@ -30,7 +30,7 @@ * using hadoop counters. They will be published using special keys and * then retrieved on the client after the insert/ctas statement ran. */ -public class CounterStatsAggregatorTez implements StatsAggregator, StatsCollectionTaskIndependent { +public class CounterStatsAggregatorTez implements StatsAggregator { private static final Log LOG = LogFactory.getLog(CounterStatsAggregatorTez.class.getName()); @@ -76,9 +76,4 @@ public String aggregateStats(String keyPrefix, String statType) { public boolean closeConnection(StatsCollectionContext scc) { return true; } - - @Override - public boolean cleanUp(String keyPrefix) { - return true; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java index e5f1400..65c3b6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.mapred.Reporter; -public class CounterStatsPublisher implements StatsPublisher, StatsCollectionTaskIndependent { +public class CounterStatsPublisher implements StatsPublisher { private static final Log LOG = LogFactory.getLog(CounterStatsPublisher.class.getName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java index b115daf..dacf7a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java @@ -63,20 +63,4 @@ * @return true if close connection is successful, false otherwise. */ public boolean closeConnection(StatsCollectionContext scc); - - /** - * This method is called after all statistics have been aggregated. Since we support multiple - * statistics, we do not perform automatic cleanup after aggregation. - * After this method is called, closeConnection must be called as well. - * This method is also used to clear the temporary statistics that have been published without - * being aggregated. - * Typically this happens when a job fails, or is forcibly stopped after publishing some - * statistics. - * - * @param keyPrefix - * a prefix of the keys used in StatsPublisher to publish stats. It is the same - * as the first parameter in aggregateStats(). - * @return true if cleanup is successful, false otherwise. - */ - public boolean cleanUp(String keyPrefix); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionTaskIndependent.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionTaskIndependent.java deleted file mode 100644 index 52c06c2..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionTaskIndependent.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.stats; - -/** This is just a marker interface to differentiate between stats publisher / aggregator - * which don't track stats per task, as oppose to others which do. - */ -public interface StatsCollectionTaskIndependent { - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java index b9878a3..053fa18 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst.StatDB; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -90,7 +89,7 @@ private StatsFactory(Configuration conf) { private boolean initialize(String type) { ClassLoader classLoader = Utilities.getSessionSpecifiedClassLoader(); try { - StatDB statDB = type.startsWith("jdbc") ? StatDB.jdbc : StatDB.valueOf(type); + StatDB statDB = StatDB.valueOf(type); publisherImplementation = (Class) Class.forName(statDB.getPublisher(jobConf), true, classLoader); aggregatorImplementation = (Class) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java index 6dfc178..f5303ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java @@ -34,11 +34,10 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; -import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import com.esotericsoftware.kryo.io.Input; -public class FSStatsAggregator implements StatsAggregator, StatsCollectionTaskIndependent { +public class FSStatsAggregator implements StatsAggregator { private final Log LOG = LogFactory.getLog(this.getClass().getName()); private List>> statsList; private Map> statsMap; @@ -112,9 +111,4 @@ public boolean closeConnection(StatsCollectionContext scc) { return true; } } - - @Override - public boolean cleanUp(String keyPrefix) { - return true; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java index aa2bf62..e5a907c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java @@ -30,13 +30,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import com.esotericsoftware.kryo.io.Output; -public class FSStatsPublisher implements StatsPublisher, StatsCollectionTaskIndependent { +public class FSStatsPublisher implements StatsPublisher { private Configuration conf; private final Log LOG = LogFactory.getLog(this.getClass().getName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java deleted file mode 100644 index d8c9926..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.stats.jdbc; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.SQLRecoverableException; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.stats.StatsAggregator; -import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; - -public class JDBCStatsAggregator implements StatsAggregator { - - private Connection conn; - private String connectionString; - private Configuration hiveconf; - private final Map columnMapping; - private final Log LOG = LogFactory.getLog(this.getClass().getName()); - private int timeout = 30; - private final String comment = "Hive stats aggregation: " + this.getClass().getName(); - private int maxRetries; - private long waitWindow; - private final Random r; - - public JDBCStatsAggregator() { - columnMapping = new HashMap(); - r = new Random(); - } - - @Override - public boolean connect(StatsCollectionContext scc) { - this.hiveconf = scc.getHiveConf(); - timeout = (int) HiveConf.getTimeVar( - hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS); - connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); - maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); - waitWindow = HiveConf.getTimeVar( - hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS); - - try { - JavaUtils.loadClass(driver).newInstance(); - } catch (Exception e) { - LOG.error("Error during instantiating JDBC driver " + driver + ". ", e); - return false; - } - - // stats is non-blocking -- throw an exception when timeout - DriverManager.setLoginTimeout(timeout); - // function pointer for executeWithRetry to setQueryTimeout - Utilities.SQLCommand setQueryTimeout = new Utilities.SQLCommand() { - @Override - public Void run(PreparedStatement stmt) throws SQLException { - Utilities.setQueryTimeout(stmt, timeout); - return null; - } - }; - - // retry connection and statement preparations - for (int failures = 0;; failures++) { - try { - conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries); - - for (String statType : JDBCStatsUtils.getSupportedStatistics()) { - // prepare statements - PreparedStatement selStmt = Utilities.prepareWithRetry(conn, - JDBCStatsUtils.getSelectAggr(statType, comment), waitWindow, maxRetries); - columnMapping.put(statType, selStmt); - // set query timeout - Utilities.executeWithRetry(setQueryTimeout, selStmt, waitWindow, failures); - } - return true; - } catch (SQLRecoverableException e) { - if (failures > maxRetries) { - LOG.error("Error during JDBC connection and preparing statement: " + e); - return false; - } - long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); - try { - Thread.sleep(waitTime); - } catch (InterruptedException e1) { - } - } catch (SQLException e) { - // for SQLTransientException (maxRetries already achieved at Utilities retry functions - // or SQLNonTransientException, declare a real failure - return false; - } - } - } - - @Override - public String aggregateStats(String fileID, String statType) { - - if (!JDBCStatsUtils.isValidStatistic(statType)) { - LOG.warn("Invalid statistic: " + statType + ", supported stats: " + - JDBCStatsUtils.getSupportedStatistics()); - return null; - } - - Utilities.SQLCommand execQuery = new Utilities.SQLCommand() { - @Override - public ResultSet run(PreparedStatement stmt) throws SQLException { - return stmt.executeQuery(); - } - }; - - JDBCStatsUtils.validateRowId(fileID); - String keyPrefix = Utilities.escapeSqlLike(fileID) + "%"; - for (int failures = 0;; failures++) { - try { - long retval = 0; - - PreparedStatement selStmt = columnMapping.get(statType); - selStmt.setString(1, keyPrefix); - selStmt.setString(2, Character.toString(Utilities.sqlEscapeChar)); - - ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries); - if (result.next()) { - retval = result.getLong(1); - } else { - LOG.warn("Nothing published. Nothing aggregated."); - return null; - } - return Long.toString(retval); - } catch (SQLRecoverableException e) { - // need to start from scratch (connection) - if (failures >= maxRetries) { - return null; - } - // close the current connection - closeConnection(null); - long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); - try { - Thread.sleep(waitTime); - } catch (InterruptedException iex) { - } - // getting a new connection - if (!connect(new StatsCollectionContext(hiveconf))) { - // if cannot reconnect, just fail because connect() already handles retries. - LOG.error("Error during publishing aggregation. " + e); - return null; - } - } catch (SQLException e) { - // for SQLTransientException (already handled by Utilities.*WithRetries() functions - // and SQLNonTransientException, just declare failure. - LOG.error("Error during publishing aggregation. " + e); - return null; - } - } - } - - @Override - public boolean closeConnection(StatsCollectionContext scc) { - - if (conn == null) { - return true; - } - - try { - conn.close(); - // In case of derby, explicitly close the database connection - if (HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCLASS).equalsIgnoreCase( - "jdbc:derby")) { - try { - // The following closes the derby connection. It throws an exception that has to be caught - // and ignored. - DriverManager.getConnection(connectionString + ";shutdown=true"); - } catch (Exception e) { - // Do nothing because we know that an exception is thrown anyway. - } - } - return true; - } catch (SQLException e) { - LOG.error("Error during JDBC termination. " + e); - return false; - } - } - - @Override - public boolean cleanUp(String rowID) { - - Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() { - @Override - public Void run(PreparedStatement stmt) throws SQLException { - stmt.executeUpdate(); - return null; - } - }; - try { - - JDBCStatsUtils.validateRowId(rowID); - String keyPrefix = Utilities.escapeSqlLike(rowID) + "%"; - - PreparedStatement delStmt = Utilities.prepareWithRetry(conn, - JDBCStatsUtils.getDeleteAggr(rowID, comment), waitWindow, maxRetries); - delStmt.setString(1, keyPrefix); - delStmt.setString(2, Character.toString(Utilities.sqlEscapeChar)); - - for (int failures = 0;; failures++) { - try { - Utilities.executeWithRetry(execUpdate, delStmt, waitWindow, maxRetries); - return true; - } catch (SQLRecoverableException e) { - // need to start from scratch (connection) - if (failures >= maxRetries) { - LOG.error("Error during clean-up after " + maxRetries + " retries. " + e); - return false; - } - // close the current connection - closeConnection(null); - long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); - try { - Thread.sleep(waitTime); - } catch (InterruptedException iex) { - } - // getting a new connection - if (!connect(new StatsCollectionContext(hiveconf))) { - LOG.error("Error during clean-up. " + e); - return false; - } - } catch (SQLException e) { - // for SQLTransientException (already handled by Utilities.*WithRetries() functions - // and SQLNonTransientException, just declare failure. - LOG.error("Error during clean-up. " + e); - return false; - } - } - } catch (SQLException e) { - LOG.error("Error during publishing aggregation. " + e); - return false; - } - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java deleted file mode 100644 index 0318a8c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.stats.jdbc; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.SQLIntegrityConstraintViolationException; -import java.sql.SQLRecoverableException; -import java.sql.Statement; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; -import org.apache.hadoop.hive.ql.stats.StatsPublisher; - -public class JDBCStatsPublisher implements StatsPublisher { - - private Connection conn; - private String connectionString; - private Configuration hiveconf; - private final Log LOG = LogFactory.getLog(this.getClass().getName()); - private PreparedStatement updStmt, insStmt; - private int timeout; // default timeout in sec. for JDBC connection and statements - // SQL comment that identifies where the SQL statement comes from - private final String comment = "Hive stats publishing: " + this.getClass().getName(); - private int maxRetries; - private long waitWindow; - private final Random r; - - public JDBCStatsPublisher() { - r = new Random(); - } - - @Override - public boolean connect(StatsCollectionContext context) { - this.hiveconf = context.getHiveConf(); - maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); - waitWindow = HiveConf.getTimeVar( - hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS); - connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - timeout = (int) HiveConf.getTimeVar( - hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS); - String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); - - try { - JavaUtils.loadClass(driver).newInstance(); - } catch (Exception e) { - LOG.error("Error during instantiating JDBC driver " + driver + ". ", e); - return false; - } - - DriverManager.setLoginTimeout(timeout); // stats is non-blocking - - // function pointer for executeWithRetry to setQueryTimeout - Utilities.SQLCommand setQueryTimeout = new Utilities.SQLCommand() { - @Override - public Void run(PreparedStatement stmt) throws SQLException { - Utilities.setQueryTimeout(stmt, timeout); - return null; - } - }; - - for (int failures = 0;; failures++) { - try { - conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries); - - // prepare statements - updStmt = Utilities.prepareWithRetry(conn, JDBCStatsUtils.getUpdate(comment), waitWindow, - maxRetries); - insStmt = Utilities.prepareWithRetry(conn, JDBCStatsUtils.getInsert(comment), waitWindow, - maxRetries); - - // set query timeout - Utilities.executeWithRetry(setQueryTimeout, updStmt, waitWindow, maxRetries); - Utilities.executeWithRetry(setQueryTimeout, insStmt, waitWindow, maxRetries); - - - return true; - } catch (SQLRecoverableException e) { - if (failures >= maxRetries) { - LOG.error("Error during JDBC connection to " + connectionString + ". ", e); - return false; // just return false without fail the task - } - long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); - try { - Thread.sleep(waitTime); - } catch (InterruptedException e1) { - } - } catch (SQLException e) { - // for SQLTransientException (maxRetries already achieved at Utilities retry functions - // or SQLNonTransientException, declare a real failure - LOG.error("Error during JDBC connection to " + connectionString + ". ", e); - return false; - } - } - } - - @Override - public boolean publishStat(String fileID, Map stats) { - - if (stats.isEmpty()) { - // If there are no stats to publish, nothing to do. - return true; - } - - if (conn == null) { - LOG.error("JDBC connection is null. Cannot publish stats without JDBC connection."); - return false; - } - - if (!JDBCStatsUtils.isValidStatisticSet(stats.keySet())) { - LOG.warn("Invalid statistic:" + stats.keySet().toString() + ", supported " - + " stats: " + JDBCStatsUtils.getSupportedStatistics()); - return false; - } - JDBCStatsUtils.validateRowId(fileID); - if (LOG.isInfoEnabled()) { - LOG.info("Stats publishing for key " + fileID); - } - - Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() { - @Override - public Void run(PreparedStatement stmt) throws SQLException { - stmt.executeUpdate(); - return null; - } - }; - - List supportedStatistics = JDBCStatsUtils.getSupportedStatistics(); - - for (int failures = 0;; failures++) { - try { - insStmt.setString(1, fileID); - for (int i = 0; i < JDBCStatsUtils.getSupportedStatistics().size(); i++) { - insStmt.setString(i + 2, stats.get(supportedStatistics.get(i))); - } - Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries); - return true; - } catch (SQLIntegrityConstraintViolationException e) { - - // We assume that the table used for partial statistics has a primary key declared on the - // "fileID". The exception will be thrown if two tasks report results for the same fileID. - // In such case, we either update the row, or abandon changes depending on which statistic - // is newer. - - for (int updateFailures = 0;; updateFailures++) { - try { - int i; - for (i = 0; i < JDBCStatsUtils.getSupportedStatistics().size(); i++) { - updStmt.setString(i + 1, stats.get(supportedStatistics.get(i))); - } - updStmt.setString(supportedStatistics.size() + 1, fileID); - updStmt.setString(supportedStatistics.size() + 2, - stats.get(JDBCStatsUtils.getBasicStat())); - updStmt.setString(supportedStatistics.size() + 3, fileID); - Utilities.executeWithRetry(execUpdate, updStmt, waitWindow, maxRetries); - return true; - } catch (SQLRecoverableException ue) { - // need to start from scratch (connection) - if (!handleSQLRecoverableException(ue, updateFailures)) { - return false; - } - } catch (SQLException ue) { - LOG.error("Error during publishing statistics. ", e); - return false; - } - } - - } catch (SQLRecoverableException e) { - // need to start from scratch (connection) - if (!handleSQLRecoverableException(e, failures)) { - return false; - } - } catch (SQLException e) { - LOG.error("Error during publishing statistics. ", e); - return false; - } - } - } - - private boolean handleSQLRecoverableException(Exception e, int failures) { - if (failures >= maxRetries) { - return false; - } - StatsCollectionContext sCntxt = new StatsCollectionContext(hiveconf); - // close the current connection - closeConnection(sCntxt); - long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); - try { - Thread.sleep(waitTime); - } catch (InterruptedException iex) { - } - // get a new connection - if (!connect(sCntxt)) { - // if cannot reconnect, just fail because connect() already handles retries. - LOG.error("Error during publishing aggregation. " + e); - return false; - } - return true; - } - - @Override - public boolean closeConnection(StatsCollectionContext context) { - if (conn == null) { - return true; - } - try { - if (updStmt != null) { - updStmt.close(); - } - if (insStmt != null) { - insStmt.close(); - } - - conn.close(); - - // In case of derby, explicitly shutdown the database otherwise it reports error when - // trying to connect to the same JDBC connection string again. - if (HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCLASS).equalsIgnoreCase( - "jdbc:derby")) { - try { - // The following closes the derby connection. It throws an exception that has to be caught - // and ignored. - synchronized(DriverManager.class) { - DriverManager.getConnection(connectionString + ";shutdown=true"); - } - } catch (Exception e) { - // Do nothing because we know that an exception is thrown anyway. - } - } - return true; - } catch (SQLException e) { - LOG.error("Error during JDBC termination. ", e); - return false; - } - } - - /** - * Initialize the intermediate stats DB for the first time it is running (e.g., - * creating tables.). - */ - @Override - public boolean init(StatsCollectionContext context) { - Statement stmt = null; - ResultSet rs = null; - try { - this.hiveconf = context.getHiveConf(); - connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); - JavaUtils.loadClass(driver).newInstance(); - synchronized(DriverManager.class) { - DriverManager.setLoginTimeout(timeout); - conn = DriverManager.getConnection(connectionString); - - stmt = conn.createStatement(); - Utilities.setQueryTimeout(stmt, timeout); - - // TODO: why is this not done using Hive db scripts? - // Check if the table exists - DatabaseMetaData dbm = conn.getMetaData(); - String tableName = JDBCStatsUtils.getStatTableName(); - rs = dbm.getTables(null, null, tableName, null); - boolean tblExists = rs.next(); - if (!tblExists) { // Table does not exist, create it - String createTable = JDBCStatsUtils.getCreate(""); - try { - stmt.executeUpdate(createTable); - } catch (SQLException ex) { - String msg = ex.getMessage(); - if (msg != null && msg.contains("Specified key was too long")) { - throw new RuntimeException(msg + "; try using innodb with " - + "Barracuda file format and innodb_large_prefix", ex); - } - throw ex; - } - } else { - // Upgrade column name to allow for longer paths. - String idColName = JDBCStatsUtils.getIdColumnName(); - int colSize = -1; - try { - rs.close(); - rs = dbm.getColumns(null, null, tableName, idColName); - if (rs.next()) { - colSize = rs.getInt("COLUMN_SIZE"); - if (colSize < JDBCStatsSetupConstants.ID_COLUMN_VARCHAR_SIZE) { - String alterTable = JDBCStatsUtils.getAlterIdColumn(); - stmt.executeUpdate(alterTable); - } - } else { - LOG.warn("Failed to update " + idColName + " - column not found"); - } - } catch (Throwable t) { - LOG.warn("Failed to update " + idColName + " (size " - + (colSize == -1 ? "unknown" : colSize) + ")", t); - } - } - } - } catch (Exception e) { - LOG.error("Error during JDBC initialization. ", e); - return false; - } finally { - if(rs != null) { - try { - rs.close(); - } catch (SQLException e) { - // do nothing - } - } - if(stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // do nothing - } - } - closeConnection(context); - } - return true; - } - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java deleted file mode 100644 index e39fc5b..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.stats.jdbc; - -public final class JDBCStatsSetupConstants { - - public static final String PART_STAT_ID_COLUMN_NAME = "ID"; - - public static final String PART_STAT_TIMESTAMP_COLUMN_NAME = "TS"; - - // NOTE: - // For all table names past and future, Hive will not drop old versions of this table, it is up - // to the administrator - public static final String PART_STAT_TABLE_NAME = "PARTITION_STATS_V2"; - - // supported statistics - column names - - public static final String PART_STAT_ROW_COUNT_COLUMN_NAME = "ROW_COUNT"; - - public static final String PART_STAT_RAW_DATA_SIZE_COLUMN_NAME = "RAW_DATA_SIZE"; - - // MySQL - 3072/3 (innodb+utf8), SQL Server - 8000, Oracle - 4000, Derby - 32762, Postgres - large. - public static final int ID_COLUMN_VARCHAR_SIZE = 1000; -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java deleted file mode 100644 index 59d94d5..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.stats.jdbc; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hive.common.StatsSetupConst; - -public class JDBCStatsUtils { - - private static final List supportedStats = new ArrayList(); - private static final Map columnNameMapping = new HashMap(); - static { - // supported statistics - supportedStats.add(StatsSetupConst.ROW_COUNT); - supportedStats.add(StatsSetupConst.RAW_DATA_SIZE); - - // row count statistics - columnNameMapping.put(StatsSetupConst.ROW_COUNT, - JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME); - - // raw data size - columnNameMapping.put(StatsSetupConst.RAW_DATA_SIZE, - JDBCStatsSetupConstants.PART_STAT_RAW_DATA_SIZE_COLUMN_NAME); - } - - /** - * Returns the set of supported statistics - */ - public static List getSupportedStatistics() { - return supportedStats; - } - - /** - * Check if the set to be published is within the supported statistics. - * It must also contain at least the basic statistics (used for comparison) - * - * @param stats - * - stats to be published - * @return true if is a valid statistic set, false otherwise - */ - - public static boolean isValidStatisticSet(Collection stats) { - if (!stats.contains(getBasicStat())) { - return false; - } - for (String stat : stats) { - if (!supportedStats.contains(stat)) { - return false; - } - } - return true; - } - - /** - * Check if a particular statistic type is supported - * - * @param statType - * - statistic to be published - * @return true if statType is supported, false otherwise - */ - public static boolean isValidStatistic(String statType) { - return supportedStats.contains(statType); - } - - /** - * Returns the name of the column storing the key for statistics. - */ - public static String getIdColumnName() { - return JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME; - } - - public static String getTimestampColumnName() { - return JDBCStatsSetupConstants.PART_STAT_TIMESTAMP_COLUMN_NAME; - } - - public static String getStatTableName() { - return JDBCStatsSetupConstants.PART_STAT_TABLE_NAME; - } - - /** - * Returns the column where the statistics for the given type are stored. - * - * @param statType - * - supported statistic. - * @return column name for the given statistic. - */ - public static String getStatColumnName(String statType) { - return columnNameMapping.get(statType); - } - - /** - * Returns the basic type of the supported statistics. - * It is used to determine which statistics are fresher. - */ - public static String getBasicStat() { - return supportedStats.get(0); - } - - - - - - /** - * Prepares CREATE TABLE query - */ - public static String getCreate(String comment) { - String create = "CREATE TABLE /* " + comment + " */ " + JDBCStatsUtils.getStatTableName() - + " (" + getTimestampColumnName() + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " - + JDBCStatsUtils.getIdColumnName() + " VARCHAR(" - + JDBCStatsSetupConstants.ID_COLUMN_VARCHAR_SIZE + ") PRIMARY KEY "; - for (int i = 0; i < supportedStats.size(); i++) { - create += ", " + getStatColumnName(supportedStats.get(i)) + " BIGINT "; - } - create += ")"; - return create; - } - - /** - * Prepares ALTER TABLE query - */ - public static String getAlterIdColumn() { - return "ALTER TABLE " + JDBCStatsUtils.getStatTableName() + " ALTER COLUMN " - + JDBCStatsUtils.getIdColumnName() + " VARCHAR(" - + JDBCStatsSetupConstants.ID_COLUMN_VARCHAR_SIZE + ")"; - } - - /** - * Prepares UPDATE statement issued when updating existing statistics - */ - public static String getUpdate(String comment) { - String update = "UPDATE /* " + comment + " */ " + getStatTableName() + " SET "; - for (int i = 0; i < supportedStats.size(); i++) { - update += columnNameMapping.get(supportedStats.get(i)) + " = ? , "; - } - update += getTimestampColumnName() + " = CURRENT_TIMESTAMP"; - update += " WHERE " + JDBCStatsUtils.getIdColumnName() + " = ? AND ? > ( SELECT TEMP." - + getStatColumnName(getBasicStat()) + " FROM ( " + - " SELECT " + getStatColumnName(getBasicStat()) + " FROM " + getStatTableName() + " WHERE " - + getIdColumnName() + " = ? ) TEMP )"; - return update; - } - - /** - * Prepares INSERT statement for statistic publishing. - */ - public static String getInsert(String comment) { - String columns = JDBCStatsUtils.getIdColumnName(); - String values = "?"; - - for (int i = 0; i < supportedStats.size(); i++) { - columns += ", " + getStatColumnName(supportedStats.get(i)); - values += ", ?"; - } - String insert = "INSERT INTO /* " + comment + " */ " + getStatTableName() + "(" + columns + - ") VALUES (" + values + ")"; - return insert; - } - - /** - * Prepares SELECT query for statistics aggregation. - * - * @param statType - * - statistic type to be aggregated. - * @param comment - * @return aggregated value for the given statistic - */ - public static String getSelectAggr(String statType, String comment) { - String select = "SELECT /* " + comment + " */ " + "SUM( " - + getStatColumnName(statType) + " ) " + " FROM " - + getStatTableName() + " WHERE " + JDBCStatsUtils.getIdColumnName() + " LIKE ? ESCAPE ?"; - return select; - } - - /** - * Prepares DELETE statement for cleanup. - */ - public static String getDeleteAggr(String rowID, String comment) { - String delete = "DELETE /* " + comment + " */ " + - " FROM " + getStatTableName() + " WHERE " + JDBCStatsUtils.getIdColumnName() + - " LIKE ? ESCAPE ?"; - return delete; - } - - /** - * Make sure the row ID fits into the row ID column in the table. - */ - public static void validateRowId(String rowId) { - if (rowId.length() > JDBCStatsSetupConstants.ID_COLUMN_VARCHAR_SIZE) { - throw new RuntimeException("ID is too big, client should have truncated it: " + rowId); - } - } -} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index d22d022..c73c620 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -893,10 +893,5 @@ public String aggregateStats(String keyPrefix, String statType) { public boolean closeConnection(StatsCollectionContext scc) { return true; } - - @Override - public boolean cleanUp(String keyPrefix) { - return true; - } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java deleted file mode 100644 index c257797..0000000 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java +++ /dev/null @@ -1,384 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec; - -import java.util.HashMap; -import java.util.Map; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.stats.StatsAggregator; -import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; -import org.apache.hadoop.hive.ql.stats.StatsFactory; -import org.apache.hadoop.hive.ql.stats.StatsPublisher; -import org.apache.hadoop.mapred.JobConf; - -/** - * TestPublisher jdbc. - * - */ -public class TestStatsPublisherEnhanced extends TestCase { - - protected Configuration conf; - protected String statsImplementationClass; - protected Map stats; - - protected StatsFactory factory; - - public TestStatsPublisherEnhanced(String name) { - super(name); - conf = new JobConf(TestStatsPublisherEnhanced.class); - conf.set("hive.stats.dbclass", "jdbc:derby"); - factory = StatsFactory.newFactory(conf); - assert factory != null; - } - - @Override - protected void setUp() { - stats = new HashMap(); - } - - @Override - protected void tearDown() { - StatsAggregator sa = factory.getStatsAggregator(); - assertNotNull(sa); - StatsCollectionContext sc = new StatsCollectionContext(conf); - assertTrue(sa.connect(sc)); - assertTrue(sa.cleanUp("file_0")); - assertTrue(sa.closeConnection(sc)); - } - - private void fillStatMap(String numRows, String rawDataSize) { - stats.clear(); - stats.put(StatsSetupConst.ROW_COUNT, numRows); - if (!rawDataSize.equals("")) { - stats.put(StatsSetupConst.RAW_DATA_SIZE, rawDataSize); - } - } - - public void testStatsPublisherOneStat() throws Throwable { - try { - System.out.println("StatsPublisher - one stat published per key - aggregating matching key"); - - // instantiate stats publisher - StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); - assertNotNull(statsPublisher); - StatsCollectionContext sc = new StatsCollectionContext(conf); - assertTrue(statsPublisher.init(sc)); - assertTrue(statsPublisher.connect(sc)); - - // instantiate stats aggregator - StatsAggregator statsAggregator = factory.getStatsAggregator(); - assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(sc)); - - // publish stats - fillStatMap("200", "1000"); - assertTrue(statsPublisher.publishStat("file_00000", stats)); - fillStatMap("400", "3000"); - assertTrue(statsPublisher.publishStat("file_00001", stats)); - - - // aggregate existing stats - String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); - assertEquals("200", rows0); - String usize0 = statsAggregator.aggregateStats("file_00000", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("1000", usize0); - - String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT); - assertEquals("400", rows1); - String usize1 = statsAggregator.aggregateStats("file_00001", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("3000", usize1); - - // close connections - assertTrue(statsPublisher.closeConnection(sc)); - assertTrue(statsAggregator.closeConnection(sc)); - - System.out - .println("StatsPublisher - one stat published per key - aggregating matching key - OK"); - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } - } - - public void testStatsPublisher() throws Throwable { - try { - System.out.println("StatsPublisher - basic functionality"); - - // instantiate stats publisher - StatsPublisher statsPublisher = Utilities.getStatsPublisher( - (JobConf) conf); - assertNotNull(statsPublisher); - StatsCollectionContext sc = new StatsCollectionContext(conf); - assertTrue(statsPublisher.init(sc)); - assertTrue(statsPublisher.connect(sc)); - - // instantiate stats aggregator - StatsAggregator statsAggregator = factory.getStatsAggregator(); - assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(sc)); - // statsAggregator.cleanUp("file_0000"); - // assertTrue(statsAggregator.connect(conf)); - - // publish stats - fillStatMap("200", "1000"); - assertTrue(statsPublisher.publishStat("file_00000_a", stats)); - fillStatMap("300", "2000"); - assertTrue(statsPublisher.publishStat("file_00000_b", stats)); - - fillStatMap("400", "3000"); - assertTrue(statsPublisher.publishStat("file_00001_a", stats)); - fillStatMap("500", "4000"); - assertTrue(statsPublisher.publishStat("file_00001_b", stats)); - - // aggregate existing stats - String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); - assertEquals("500", rows0); - String usize0 = statsAggregator.aggregateStats("file_00000", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("3000", usize0); - - String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT); - assertEquals("900", rows1); - String usize1 = statsAggregator.aggregateStats("file_00001", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("7000", usize1); - - // aggregate non-existent stats - String rowsX = statsAggregator.aggregateStats("file_00002", StatsSetupConst.ROW_COUNT); - assertEquals("0", rowsX); - String usizeX = statsAggregator.aggregateStats("file_00002", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("0", usizeX); - - assertTrue(statsAggregator.cleanUp("file_0000")); - - // close connections - assertTrue(statsPublisher.closeConnection(sc)); - assertTrue(statsAggregator.closeConnection(sc)); - - System.out.println("StatsPublisher - basic functionality - OK"); - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } - } - - public void testStatsPublisherMultipleUpdates() throws Throwable { - try { - System.out.println("StatsPublisher - multiple updates"); - - // instantiate stats publisher - StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); - assertNotNull(statsPublisher); - StatsCollectionContext sc = new StatsCollectionContext(conf); - assertTrue(statsPublisher.init(sc)); - assertTrue(statsPublisher.connect(sc)); - - // instantiate stats aggregator - StatsAggregator statsAggregator = factory.getStatsAggregator(); - assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(sc)); - - // publish stats - fillStatMap("200", "1000"); - assertTrue(statsPublisher.publishStat("file_00000_a", stats)); - fillStatMap("300", "2000"); - assertTrue(statsPublisher.publishStat("file_00000_b", stats)); - - fillStatMap("400", "3000"); - assertTrue(statsPublisher.publishStat("file_00001_a", stats)); - fillStatMap("500", "4000"); - assertTrue(statsPublisher.publishStat("file_00001_b", stats)); - - // update which should not take any effect - fillStatMap("190", "1000"); - assertTrue(statsPublisher.publishStat("file_00000_a", stats)); - fillStatMap("290", "2000"); - assertTrue(statsPublisher.publishStat("file_00000_b", stats)); - - // update that should take effect - fillStatMap("500", "5000"); - assertTrue(statsPublisher.publishStat("file_00001_a", stats)); - fillStatMap("600", "6000"); - assertTrue(statsPublisher.publishStat("file_00001_b", stats)); - - // aggregate existing stats - String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); - assertEquals("500", rows0); - String usize0 = statsAggregator.aggregateStats("file_00000", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("3000", usize0); - - String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT); - assertEquals("1100", rows1); - String usize1 = statsAggregator.aggregateStats("file_00001", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("11000", usize1); - - assertTrue(statsAggregator.cleanUp("file_0000")); - - // close connections - assertTrue(statsPublisher.closeConnection(sc)); - assertTrue(statsAggregator.closeConnection(sc)); - - System.out.println("StatsPublisher - multiple updates - OK"); - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } - } - - public void testStatsPublisherMultipleUpdatesSubsetStatistics() throws Throwable { - try { - System.out - .println("StatsPublisher - (multiple updates + publishing subset of supported statistics)"); - - // instantiate stats publisher - StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); - assertNotNull(statsPublisher); - StatsCollectionContext sc = new StatsCollectionContext(conf); - assertTrue(statsPublisher.init(sc)); - assertTrue(statsPublisher.connect(sc)); - - // instantiate stats aggregator - StatsAggregator statsAggregator = factory.getStatsAggregator(); - assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(sc)); - - // publish stats - fillStatMap("200", ""); - assertTrue(statsPublisher.publishStat("file_00000_a", stats)); - fillStatMap("300", "2000"); - assertTrue(statsPublisher.publishStat("file_00000_b", stats)); - - - // aggregate existing stats - String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); - assertEquals("500", rows0); - String usize0 = statsAggregator.aggregateStats("file_00000", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("2000", usize0); - - // update which should not take any effect - plus the map published is a supset of supported - // stats - fillStatMap("190", ""); - assertTrue(statsPublisher.publishStat("file_00000_a", stats)); - fillStatMap("290", ""); - assertTrue(statsPublisher.publishStat("file_00000_b", stats)); - - // nothing changed - rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); - assertEquals("500", rows0); - usize0 = statsAggregator.aggregateStats("file_00000", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("2000", usize0); - - fillStatMap("500", ""); - assertTrue(statsPublisher.publishStat("file_00000_a", stats)); - fillStatMap("500", ""); - assertTrue(statsPublisher.publishStat("file_00000_b", stats)); - - // changed + the rawDataSize size was overwriten !!! - rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); - assertEquals("1000", rows0); - usize0 = statsAggregator.aggregateStats("file_00000", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("0", usize0); - - assertTrue(statsAggregator.cleanUp("file_0000")); - - // close connections - assertTrue(statsPublisher.closeConnection(sc)); - assertTrue(statsAggregator.closeConnection(sc)); - - System.out - .println("StatsPublisher - (multiple updates + publishing subset of supported statistics) - OK"); - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } - } - - - - public void testStatsAggregatorCleanUp() throws Throwable { - try { - System.out.println("StatsAggregator - clean-up"); - - // instantiate stats publisher - StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); - assertNotNull(statsPublisher); - StatsCollectionContext sc = new StatsCollectionContext(conf); - assertTrue(statsPublisher.init(sc)); - assertTrue(statsPublisher.connect(sc)); - - // instantiate stats aggregator - StatsAggregator statsAggregator = factory.getStatsAggregator(); - assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(sc)); - - // publish stats - fillStatMap("200", "1000"); - assertTrue(statsPublisher.publishStat("file_00000_a", stats)); - fillStatMap("300", "2000"); - assertTrue(statsPublisher.publishStat("file_00000_b", stats)); - - fillStatMap("400", "3000"); - assertTrue(statsPublisher.publishStat("file_00001_a", stats)); - fillStatMap("500", "4000"); - assertTrue(statsPublisher.publishStat("file_00001_b", stats)); - - // cleanUp - assertTrue(statsAggregator.cleanUp("file_00000")); - - // now clean-up just for one key - String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); - assertEquals("0", rows0); - String usize0 = statsAggregator.aggregateStats("file_00000", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("0", usize0); - - // this should still be in the table - String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT); - assertEquals("900", rows1); - String usize1 = statsAggregator.aggregateStats("file_00001", - StatsSetupConst.RAW_DATA_SIZE); - assertEquals("7000", usize1); - - assertTrue(statsAggregator.cleanUp("file_0000")); - - // close connections - assertTrue(statsPublisher.closeConnection(sc)); - assertTrue(statsAggregator.closeConnection(sc)); - - System.out.println("StatsAggregator - clean-up - OK"); - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } - } - -} diff --git a/ql/src/test/queries/clientpositive/lb_fs_stats.q b/ql/src/test/queries/clientpositive/lb_fs_stats.q index 7f31797..11d3a5f 100644 --- a/ql/src/test/queries/clientpositive/lb_fs_stats.q +++ b/ql/src/test/queries/clientpositive/lb_fs_stats.q @@ -15,5 +15,3 @@ ALTER TABLE test_tab SKEWED BY (key) ON ("484") STORED AS DIRECTORIES; INSERT OVERWRITE TABLE test_tab PARTITION (part = '1') SELECT * FROM src; describe formatted test_tab partition (part='1'); - -set hive.stats.dbclass=jdbc:derby; diff --git a/ql/src/test/queries/clientpositive/metadata_only_queries_with_filters.q b/ql/src/test/queries/clientpositive/metadata_only_queries_with_filters.q index a6f1148..95d033f 100644 --- a/ql/src/test/queries/clientpositive/metadata_only_queries_with_filters.q +++ b/ql/src/test/queries/clientpositive/metadata_only_queries_with_filters.q @@ -51,4 +51,3 @@ select count(*) from stats_tbl_part; select count(*)/2 from stats_tbl_part; drop table stats_tbl_part; set hive.compute.query.using.stats=false; -set hive.stats.dbclass=jdbc:derby; diff --git a/ql/src/test/queries/clientpositive/statsfs.q b/ql/src/test/queries/clientpositive/statsfs.q index 82a2295..0d29067 100644 --- a/ql/src/test/queries/clientpositive/statsfs.q +++ b/ql/src/test/queries/clientpositive/statsfs.q @@ -59,5 +59,3 @@ describe formatted t1 partition (ds='2008-04-09',hr='12'); drop table t1; set hive.exec.dynamic.partition.mode=strict; - -set hive.stats.dbclass=jdbc:derby; -- 1.7.12.4 (Apple Git-37)