From cabc2525498e4b8f94f8ef1d36fcd320974dc4f4 Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Wed, 7 Oct 2015 17:45:50 -0700 Subject: [PATCH] HIVE-12065 : FS stats collection may generate incorrect stats for multi-insert query --- .../hadoop/hive/ql/stats/DummyStatsAggregator.java | 12 +++-- .../hadoop/hive/ql/stats/DummyStatsPublisher.java | 15 +++--- .../hive/ql/stats/KeyVerifyingStatsAggregator.java | 10 ++-- .../hadoop/hive/ql/exec/FileSinkOperator.java | 8 +-- .../org/apache/hadoop/hive/ql/exec/StatsTask.java | 31 +++++++---- .../hadoop/hive/ql/exec/TableScanOperator.java | 7 ++- .../org/apache/hadoop/hive/ql/exec/Utilities.java | 31 ++++++++++- .../apache/hadoop/hive/ql/exec/mr/ExecDriver.java | 14 +++-- .../hive/ql/exec/spark/SparkPlanGenerator.java | 16 +++--- .../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 5 +- .../hive/ql/index/AggregateIndexHandler.java | 1 - .../hive/ql/index/TableBasedIndexHandler.java | 7 --- .../hive/ql/index/bitmap/BitmapIndexHandler.java | 1 - .../hive/ql/index/compact/CompactIndexHandler.java | 1 - .../hive/ql/io/rcfile/stats/PartialScanMapper.java | 7 ++- .../hive/ql/io/rcfile/stats/PartialScanTask.java | 11 ++-- .../hive/ql/io/rcfile/stats/PartialScanWork.java | 9 ++++ .../hadoop/hive/ql/optimizer/GenMRTableScan1.java | 3 ++ .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 2 +- .../hadoop/hive/ql/parse/ProcessAnalyzeTable.java | 4 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 8 +-- .../ql/parse/spark/SparkProcessAnalyzeTable.java | 2 + .../apache/hadoop/hive/ql/plan/FileSinkDesc.java | 16 +++++- .../org/apache/hadoop/hive/ql/plan/StatsWork.java | 15 ++++-- .../apache/hadoop/hive/ql/plan/TableScanDesc.java | 12 ++++- .../hive/ql/stats/CounterStatsAggregator.java | 8 +-- .../hive/ql/stats/CounterStatsAggregatorSpark.java | 6 +-- .../hive/ql/stats/CounterStatsAggregatorTez.java | 10 ++-- .../hive/ql/stats/CounterStatsPublisher.java | 7 ++- .../hadoop/hive/ql/stats/StatsAggregator.java | 7 +-- .../hive/ql/stats/StatsCollectionContext.java | 63 ++++++++++++++++++++++ .../hadoop/hive/ql/stats/StatsPublisher.java | 8 ++- .../hadoop/hive/ql/stats/fs/FSStatsAggregator.java | 23 ++++---- .../hadoop/hive/ql/stats/fs/FSStatsPublisher.java | 32 ++++++----- .../hive/ql/stats/jdbc/JDBCStatsAggregator.java | 18 +++---- .../hive/ql/stats/jdbc/JDBCStatsPublisher.java | 22 ++++---- .../hadoop/hive/ql/exec/TestFileSinkOperator.java | 13 +++-- .../hive/ql/exec/TestStatsPublisherEnhanced.java | 61 +++++++++++---------- .../infer_bucket_sort_multi_insert.q | 1 + ql/src/test/queries/clientpositive/multi_insert.q | 2 +- .../queries/clientpositive/multi_insert_gby2.q | 2 +- .../queries/clientpositive/multi_insert_gby3.q | 2 +- .../clientpositive/multi_insert_lateral_view.q | 1 + .../queries/clientpositive/multi_insert_mixed.q | 2 +- .../multi_insert_move_tasks_share_dependencies.q | 2 +- .../clientpositive/multi_insert_union_src.q | 2 +- 46 files changed, 359 insertions(+), 181 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionContext.java diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java index 327eabc..eb3f6eb 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hive.ql.stats; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Task; /** * An test implementation for StatsAggregator. @@ -34,8 +32,9 @@ // This is a test. The parameter hive.test.dummystats.aggregator's value // denotes the method which needs to throw an error. - public boolean connect(Configuration hconf, Task sourceTask) { - errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATAGGR); + @Override + public boolean connect(StatsCollectionContext scc) { + errorMethod = HiveConf.getVar(scc.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATAGGR); if (errorMethod.equalsIgnoreCase("connect")) { return false; } @@ -43,17 +42,20 @@ public boolean connect(Configuration hconf, Task sourceTask) { return true; } + @Override public String aggregateStats(String keyPrefix, String statType) { return null; } - public boolean closeConnection() { + @Override + public boolean closeConnection(StatsCollectionContext scc) { if (errorMethod.equalsIgnoreCase("closeConnection")) { return false; } return true; } + @Override public boolean cleanUp(String keyPrefix) { if (errorMethod.equalsIgnoreCase("cleanUp")) { return false; diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java index 1f6e80f..9f1fdb4 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java @@ -20,7 +20,6 @@ import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; /** @@ -36,8 +35,9 @@ // This is a test. The parameter hive.test.dummystats.publisher's value // denotes the method which needs to throw an error. - public boolean init(Configuration hconf) { - errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB); + @Override + public boolean init(StatsCollectionContext context) { + errorMethod = HiveConf.getVar(context.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB); if (errorMethod.equalsIgnoreCase("init")) { return false; } @@ -45,8 +45,9 @@ public boolean init(Configuration hconf) { return true; } - public boolean connect(Configuration hconf) { - errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB); + @Override + public boolean connect(StatsCollectionContext context) { + errorMethod = HiveConf.getVar(context.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB); if (errorMethod.equalsIgnoreCase("connect")) { return false; } @@ -54,6 +55,7 @@ public boolean connect(Configuration hconf) { return true; } + @Override public boolean publishStat(String fileID, Map stats) { if (errorMethod.equalsIgnoreCase("publishStat")) { return false; @@ -61,7 +63,8 @@ public boolean publishStat(String fileID, Map stats) { return true; } - public boolean closeConnection() { + @Override + public boolean closeConnection(StatsCollectionContext context) { if (errorMethod.equalsIgnoreCase("closeConnection")) { return false; } diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java index cb0b584..4e00316 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.stats; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.session.SessionState; /** @@ -30,10 +28,12 @@ public class KeyVerifyingStatsAggregator implements StatsAggregator { - public boolean connect(Configuration hconf, Task sourceTask) { + @Override + public boolean connect(StatsCollectionContext scc) { return true; } + @Override public String aggregateStats(String keyPrefix, String statType) { SessionState ss = SessionState.get(); // Have to use the length instead of the actual prefix because the prefix is location dependent @@ -43,10 +43,12 @@ public String aggregateStats(String keyPrefix, String statType) { return null; } - public boolean closeConnection() { + @Override + public boolean closeConnection(StatsCollectionContext scc) { return true; } + @Override public boolean cleanUp(String keyPrefix) { return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 39944a9..5fa27ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -61,9 +61,9 @@ import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -1138,7 +1138,9 @@ private void publishStats() throws HiveException { return; } - if (!statsPublisher.connect(hconf)) { + StatsCollectionContext sContext = new StatsCollectionContext(hconf); + sContext.setStatsTmpDir(conf.getStatsTmpDir()); + if (!statsPublisher.connect(sContext)) { // just return, stats gathering should not block the main query LOG.error("StatsPublishing error: cannot connect to database"); if (isStatsReliable) { @@ -1205,7 +1207,7 @@ private void publishStats() throws HiveException { } } } - if (!statsPublisher.closeConnection()) { + if (!statsPublisher.closeConnection(sContext)) { // The original exception is lost. // Not changing the interface to maintain backward compatibility if (isStatsReliable) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 41ece04..9775645 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.stats.StatsAggregator; import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.util.StringUtils; @@ -134,13 +135,14 @@ private int aggregateStats() { StatsAggregator statsAggregator = null; int ret = 0; - + StatsCollectionContext scc = null; try { // Stats setup: Warehouse wh = new Warehouse(conf); if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) { try { - statsAggregator = createStatsAggregator(conf); + scc = getContext(); + statsAggregator = createStatsAggregator(scc); } catch (HiveException e) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw e; @@ -241,7 +243,7 @@ private int aggregateStats() { } } finally { if (statsAggregator != null) { - statsAggregator.closeConnection(); + statsAggregator.closeConnection(scc); } } // The return value of 0 indicates success, @@ -268,7 +270,7 @@ private String getAggregationPrefix(boolean counter, Table table, Partition part return prefix.toString(); } - private StatsAggregator createStatsAggregator(HiveConf conf) throws HiveException { + private StatsAggregator createStatsAggregator(StatsCollectionContext scc) throws HiveException { String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); StatsFactory factory = StatsFactory.newFactory(statsImpl, conf); if (factory == null) { @@ -277,21 +279,30 @@ private StatsAggregator createStatsAggregator(HiveConf conf) throws HiveExceptio // initialize stats publishing table for noscan which has only stats task // the rest of MR task following stats task initializes it in ExecDriver.java StatsPublisher statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(conf)) { // creating stats table if not exists + if (!statsPublisher.init(scc)) { // creating stats table if not exists throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); } - Task sourceTask = getWork().getSourceTask(); - if (sourceTask == null) { - throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg()); - } + // manufacture a StatsAggregator StatsAggregator statsAggregator = factory.getStatsAggregator(); - if (!statsAggregator.connect(conf, sourceTask)) { + if (!statsAggregator.connect(scc)) { throw new HiveException(ErrorMsg.STATSAGGREGATOR_CONNECTION_ERROR.getErrorCodedMsg(statsImpl)); } return statsAggregator; } + private StatsCollectionContext getContext() throws HiveException { + + StatsCollectionContext scc = new StatsCollectionContext(conf); + Task sourceTask = getWork().getSourceTask(); + if (sourceTask == null) { + throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg()); + } + scc.setTask(sourceTask); + scc.setStatsTmpDir(this.getWork().getStatsTmpDir()); + return scc; + } + private boolean existStats(Map parameters) { return parameters.containsKey(StatsSetupConst.ROW_COUNT) || parameters.containsKey(StatsSetupConst.NUM_FILES) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index cbf02e9..22f7520 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -282,7 +283,9 @@ private void publishStats() throws HiveException { // Initializing a stats publisher StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc); - if (!statsPublisher.connect(jc)) { + StatsCollectionContext sc = new StatsCollectionContext(jc); + sc.setStatsTmpDir(conf.getTmpStatsDir()); + if (!statsPublisher.connect(sc)) { // just return, stats gathering should not block the main query. if (isLogInfoEnabled) { LOG.info("StatsPublishing error: cannot connect to database."); @@ -318,7 +321,7 @@ private void publishStats() throws HiveException { LOG.info("publishing : " + key + " : " + statsToPublish.toString()); } } - if (!statsPublisher.closeConnection()) { + if (!statsPublisher.closeConnection(sc)) { if (isStatsReliable) { throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 5b21af9..b1ab1b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import static com.google.common.base.Preconditions.checkNotNull; - import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.ExceptionListener; @@ -102,6 +100,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -160,6 +159,7 @@ import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.Adjacency; import org.apache.hadoop.hive.ql.plan.api.Graph; import org.apache.hadoop.hive.ql.session.SessionState; @@ -3933,4 +3933,31 @@ public static int getDPColOffset(FileSinkDesc conf) { } } + public static List getStatsTmpDirs(BaseWork work, Configuration conf) { + + List statsTmpDirs = new ArrayList<>(); + if (!StatsSetupConst.StatDB.fs.name().equalsIgnoreCase(HiveConf.getVar(conf, ConfVars.HIVESTATSDBCLASS))) { + // no-op for non-fs stats collection + return statsTmpDirs; + } + // if its auto-stats gather for inserts or CTAS, stats dir will be in FileSink + Set> ops = work.getAllLeafOperators(); + if (work instanceof MapWork) { + // if its an anlayze statement, stats dir will be in TableScan + ops.addAll(work.getAllRootOperators()); + } + for (Operator op : ops) { + OperatorDesc desc = op.getConf(); + String statsTmpDir = null; + if (desc instanceof FileSinkDesc) { + statsTmpDir = ((FileSinkDesc)desc).getStatsTmpDir(); + } else if (desc instanceof TableScanDesc) { + statsTmpDir = ((TableScanDesc) desc).getTmpStatsDir(); + } + if (statsTmpDir != null && !statsTmpDir.isEmpty()) { + statsTmpDirs.add(statsTmpDir); + } + } + return statsTmpDirs; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index d9225a9..b799a17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -75,6 +76,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -243,7 +245,7 @@ public int execute(DriverContext driverContext) { try { String partitioner = HiveConf.getVar(job, ConfVars.HIVEPARTITIONER); - job.setPartitionerClass((Class) JavaUtils.loadClass(partitioner)); + job.setPartitionerClass(JavaUtils.loadClass(partitioner)); } catch (ClassNotFoundException e) { throw new RuntimeException(e.getMessage(), e); } @@ -289,7 +291,7 @@ public int execute(DriverContext driverContext) { LOG.info("Using " + inpFormat); try { - job.setInputFormat((Class) JavaUtils.loadClass(inpFormat)); + job.setInputFormat(JavaUtils.loadClass(inpFormat)); } catch (ClassNotFoundException e) { throw new RuntimeException(e.getMessage(), e); } @@ -408,7 +410,13 @@ public int execute(DriverContext driverContext) { StatsFactory factory = StatsFactory.newFactory(job); if (factory != null) { statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(job)) { // creating stats table if not exists + List statsTmpDir = Utilities.getStatsTmpDirs(mWork, job); + if (rWork != null) { + statsTmpDir.addAll(Utilities.getStatsTmpDirs(rWork, job)); + } + StatsCollectionContext sc = new StatsCollectionContext(job); + sc.setStatsTmpDirs(statsTmpDir); + if (!statsPublisher.init(sc)) { // creating stats table if not exists if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 4c3ee4b..51e66ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.io.Writable; @@ -65,11 +66,11 @@ private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class); - private JavaSparkContext sc; + private final JavaSparkContext sc; private final JobConf jobConf; - private Context context; - private Path scratchDir; - private SparkReporter sparkReporter; + private final Context context; + private final Path scratchDir; + private final SparkReporter sparkReporter; private Map cloneToWork; private final Map workToTranMap; private final Map workToParentWorkTranMap; @@ -270,8 +271,7 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { // Make sure we'll use a different plan path from the original one HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, ""); try { - cloned.setPartitionerClass((Class) - JavaUtils.loadClass(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER))); + cloned.setPartitionerClass(JavaUtils.loadClass(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER))); } catch (ClassNotFoundException e) { String msg = "Could not find partitioner class: " + e.getMessage() + " which is specified by: " + HiveConf.ConfVars.HIVEPARTITIONER.varname; @@ -315,7 +315,9 @@ private void initStatsPublisher(BaseWork work) throws HiveException { StatsFactory factory = StatsFactory.newFactory(jobConf); if (factory != null) { statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(jobConf)) { // creating stats table if not exists + StatsCollectionContext sc = new StatsCollectionContext(jobConf); + sc.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, jobConf)); + if (!statsPublisher.init(sc)) { // creating stats table if not exists if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw new HiveException( ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 19da1c3..bf950cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.Utils; @@ -1094,8 +1095,10 @@ public Vertex createVertex(JobConf conf, BaseWork work, StatsPublisher statsPublisher; StatsFactory factory = StatsFactory.newFactory(conf); if (factory != null) { + StatsCollectionContext sCntxt = new StatsCollectionContext(conf); + sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, conf)); statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(conf)) { // creating stats table if not exists + if (!statsPublisher.init(sCntxt)) { // creating stats table if not exists if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java index e67996d..68709b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java @@ -153,7 +153,6 @@ private void createAggregationFunction(List indexTblCols, String pr builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, command, (LinkedHashMap) partSpec, indexTableName, dbName); - super.setStatsDir(builderConf); return rootTask; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java index a019350..807959e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java @@ -116,13 +116,6 @@ return null; } - protected void setStatsDir(HiveConf builderConf) { - String statsDir; - if ((statsDir = builderConf.get(StatsSetupConst.STATS_TMP_LOC)) != null) { - getConf().set(StatsSetupConst.STATS_TMP_LOC, statsDir); - } - } - protected List getPartKVPairStringArray( LinkedHashMap partSpec) { List ret = new ArrayList(partSpec.size()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java index b076933..cb191ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java @@ -289,7 +289,6 @@ public void analyzeIndexDefinition(Table baseTable, Index index, Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, command, partSpec, indexTableName, dbName); - super.setStatsDir(builderConf); return rootTask; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 1dbe230..586e16d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -150,7 +150,6 @@ public void analyzeIndexDefinition(Table baseTable, Index index, builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, command, partSpec, indexTableName, dbName); - super.setStatsDir(builderConf); return rootTask; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java index 3e1ef0a..be3a671 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.stats.CounterStatsPublisher; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.CombineHiveKey; @@ -145,7 +146,9 @@ private void publishStats() throws HiveException { throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg()); } - if (!statsPublisher.connect(jc)) { + StatsCollectionContext sc = new StatsCollectionContext(jc); + sc.setStatsTmpDir(jc.get(StatsSetupConst.STATS_TMP_LOC, "")); + if (!statsPublisher.connect(sc)) { // should fail since stats gathering is main purpose of the job LOG.error("StatsPublishing error: cannot connect to database"); throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg()); @@ -170,7 +173,7 @@ private void publishStats() throws HiveException { throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg()); } - if (!statsPublisher.closeConnection()) { + if (!statsPublisher.closeConnection(sc)) { // The original exception is lost. // Not changing the interface to maintain backward compatibility throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index cee0878..8bebd0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.StatsSetupConst.StatDB; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; @@ -48,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.io.NullWritable; @@ -145,7 +148,7 @@ public int execute(DriverContext driverContext) { LOG.info("Using " + inpFormat); try { - job.setInputFormat((Class) JavaUtils.loadClass(inpFormat)); + job.setInputFormat(JavaUtils.loadClass(inpFormat)); } catch (ClassNotFoundException e) { throw new RuntimeException(e.getMessage(), e); } @@ -175,7 +178,7 @@ public int execute(DriverContext driverContext) { HiveConf.setVar(job, HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX, work.getAggKey()); - + job.set(StatsSetupConst.STATS_TMP_LOC, work.getStatsTmpDir()); try { addInputPaths(job, work); @@ -205,7 +208,9 @@ public int execute(DriverContext driverContext) { StatsFactory factory = StatsFactory.newFactory(job); if (factory != null) { statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(job)) { // creating stats table if not exists + StatsCollectionContext sc = new StatsCollectionContext(job); + sc.setStatsTmpDir(work.getStatsTmpDir()); + if (!statsPublisher.init(sc)) { // creating stats table if not exists if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java index c0a8ae7..b7fe339 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java @@ -42,6 +42,7 @@ private transient List inputPaths; private String aggKey; + private String statsTmpDir; public PartialScanWork() { } @@ -101,4 +102,12 @@ public void setAggKey(String aggKey) { this.aggKey = aggKey; } + public String getStatsTmpDir() { + return statsTmpDir; + } + + public void setStatsTmpDir(String statsTmpDir) { + this.statsTmpDir = statsTmpDir; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java index eed1d7c..a78f888 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java @@ -64,6 +64,7 @@ public GenMRTableScan1() { * @param opProcCtx * context */ + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException { TableScanOperator op = (TableScanOperator) nd; @@ -121,6 +122,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, StatsWork statsWork = new StatsWork(op.getConf().getTableMetadata().getTableSpec()); statsWork.setAggKey(op.getConf().getStatsAggPrefix()); + statsWork.setStatsTmpDir(op.getConf().getTmpStatsDir()); statsWork.setSourceTask(currTask); statsWork.setStatsReliable(parseCtx.getConf().getBoolVar( HiveConf.ConfVars.HIVE_STATS_RELIABLE)); @@ -195,6 +197,7 @@ private void handlePartialScanCommand(TableScanOperator op, GenMRProcContext ctx PartialScanWork scanWork = new PartialScanWork(inputPaths); scanWork.setMapperCannotSpanPartns(true); scanWork.setAggKey(aggregationKey); + scanWork.setStatsTmpDir(op.getConf().getTmpStatsDir()); // stats work statsWork.setPartialScanAnalyzeCommand(true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index c696fd5..e8bd33d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1416,7 +1416,7 @@ public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask, statsWork.setSourceTask(currTask); statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); - + statsWork.setStatsTmpDir(nd.getConf().getStatsTmpDir()); if (currTask.getWork() instanceof MapredWork) { MapredWork mrWork = (MapredWork) currTask.getWork(); mrWork.getMapWork().setGatheringStats(true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java index f8d6905..214fd65 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java @@ -70,7 +70,7 @@ public Object process(Node nd, Stack stack, throws SemanticException { GenTezProcContext context = (GenTezProcContext) procContext; - + TableScanOperator tableScan = (TableScanOperator) nd; ParseContext parseContext = context.parseContext; @@ -124,6 +124,7 @@ public Object process(Node nd, Stack stack, StatsWork statsWork = new StatsWork(tableScan.getConf().getTableMetadata().getTableSpec()); statsWork.setAggKey(tableScan.getConf().getStatsAggPrefix()); + statsWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir()); statsWork.setSourceTask(context.currentTask); statsWork.setStatsReliable(parseContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); Task statsTask = TaskFactory.get(statsWork, parseContext.getConf()); @@ -181,6 +182,7 @@ private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext PartialScanWork scanWork = new PartialScanWork(inputPaths); scanWork.setMapperCannotSpanPartns(true); scanWork.setAggKey(aggregationKey); + scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir()); // stats work statsWork.setPartialScanAnalyzeCommand(true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7a54aec..82f71bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6638,8 +6638,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString()); if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { String statsTmpLoc = ctx.getExtTmpPathRelTo(queryTmpdir).toString(); - LOG.info("Set stats collection dir : " + statsTmpLoc); - conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc); + fileSinkDesc.setStatsTmpDir(statsTmpLoc); + LOG.debug("Set stats collection dir : " + statsTmpLoc); } if (dest_part != null) { @@ -9542,8 +9542,8 @@ private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String } else { if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { String statsTmpLoc = ctx.getExtTmpPathRelTo(tab.getPath()).toString(); - LOG.info("Set stats collection dir : " + statsTmpLoc); - conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc); + LOG.debug("Set stats collection dir : " + statsTmpLoc); + tsDesc.setTmpStatsDir(statsTmpLoc); } tsDesc.setGatherStats(true); tsDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java index 66e148f..e7f12db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java @@ -121,6 +121,7 @@ public Object process(Node nd, Stack stack, StatsWork statsWork = new StatsWork(tableScan.getConf().getTableMetadata().getTableSpec()); statsWork.setAggKey(tableScan.getConf().getStatsAggPrefix()); + statsWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir()); statsWork.setSourceTask(context.currentTask); statsWork.setStatsReliable(parseContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); Task statsTask = TaskFactory.get(statsWork, parseContext.getConf()); @@ -176,6 +177,7 @@ private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext PartialScanWork scanWork = new PartialScanWork(inputPaths); scanWork.setMapperCannotSpanPartns(true); scanWork.setAggKey(aggregationKey); + scanWork.setStatsTmpDir(op.getConf().getTmpStatsDir()); // stats work statsWork.setPartialScanAnalyzeCommand(true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index f73b502..9d6318a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -86,6 +86,7 @@ private boolean statsReliable; private ListBucketingCtx lbCtx; private int maxStatsKeyPrefixLength = -1; + private String statsTmpDir; private boolean statsCollectRawDataSize; @@ -156,7 +157,8 @@ public Object clone() throws CloneNotSupportedException { ret.setDpSortState(dpSortState); ret.setWriteType(writeType); ret.setTransactionId(txnId); - return (Object) ret; + ret.setStatsTmpDir(statsTmpDir); + return ret; } @Explain(displayName = "directory", explainLevels = { Level.EXTENDED }) @@ -229,7 +231,7 @@ public boolean isMultiFileSpray() { public void setMultiFileSpray(boolean multiFileSpray) { this.multiFileSpray = multiFileSpray; } - + /** * @return destination is temporary */ @@ -465,4 +467,14 @@ public Table getTable() { public void setTable(Table table) { this.table = table; } + + + public String getStatsTmpDir() { + return statsTmpDir; + } + + public void setStatsTmpDir(String statsCollectionTempDir) { + this.statsTmpDir = statsCollectionTempDir; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java index c8515db..d87022d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java @@ -57,6 +57,9 @@ // so this is set by DriverContext in runtime private transient Task sourceTask; + // used by FS based stats collector + private String statsTmpDir; + public StatsWork() { } @@ -72,10 +75,6 @@ public StatsWork(LoadFileDesc loadFileDesc) { this.loadFileDesc = loadFileDesc; } - public StatsWork(boolean statsReliable) { - this.statsReliable = statsReliable; - } - public TableSpec getTableSpecs() { return tableSpecs; } @@ -97,6 +96,14 @@ public String getAggKey() { return aggKey; } + public String getStatsTmpDir() { + return statsTmpDir; + } + + public void setStatsTmpDir(String statsTmpDir) { + this.statsTmpDir = statsTmpDir; + } + public boolean getNoStatsAggregator() { return noStatsAggregator; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 98bce96..6661ce6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -24,7 +24,6 @@ import java.util.Map; import org.apache.hadoop.hive.ql.exec.PTFUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.TableSample; @@ -70,6 +69,7 @@ private boolean gatherStats; private boolean statsReliable; private int maxStatsKeyPrefixLength = -1; + private String tmpStatsDir; private ExprNodeGenericFuncDesc filterExpr; private transient Serializable filterObject; @@ -203,6 +203,14 @@ public boolean isGatherStats() { return gatherStats; } + public String getTmpStatsDir() { + return tmpStatsDir; + } + + public void setTmpStatsDir(String tmpStatsDir) { + this.tmpStatsDir = tmpStatsDir; + } + public List getVirtualCols() { return virtualCols; } @@ -264,7 +272,7 @@ public Integer getRowLimitExplain() { public void setBucketFileNameMapping(Map bucketFileNameMapping) { this.bucketFileNameMapping = bucketFileNameMapping; } - + public void setIsMetadataOnly(boolean metadata_only) { isMetadataOnly = metadata_only; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java index 16b4460..b9863d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.JobClient; @@ -40,10 +39,11 @@ private JobClient jc; @Override - public boolean connect(Configuration hconf, Task sourceTask) { + public boolean connect(StatsCollectionContext scc) { + Task sourceTask = scc.getTask(); if (sourceTask instanceof MapRedTask) { try { - jc = new JobClient(toJobConf(hconf)); + jc = new JobClient(toJobConf(scc.getHiveConf())); RunningJob job = jc.getJob(((MapRedTask)sourceTask).getJobID()); if (job != null) { counters = job.getCounters(); @@ -71,7 +71,7 @@ public String aggregateStats(String counterGrpName, String statType) { } @Override - public boolean closeConnection() { + public boolean closeConnection(StatsCollectionContext scc) { try { jc.close(); } catch (IOException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java index 13f6024..4c01b25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java @@ -33,8 +33,8 @@ @SuppressWarnings("rawtypes") @Override - public boolean connect(Configuration hconf, Task sourceTask) { - SparkTask task = (SparkTask) sourceTask; + public boolean connect(StatsCollectionContext scc) { + SparkTask task = (SparkTask) scc.getTask(); sparkCounters = task.getSparkCounters(); if (sparkCounters == null) { return false; @@ -52,7 +52,7 @@ public String aggregateStats(String keyPrefix, String statType) { } @Override - public boolean closeConnection() { + public boolean closeConnection(StatsCollectionContext scc) { return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java index 02e8c0b..662c106 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java @@ -18,11 +18,8 @@ package org.apache.hadoop.hive.ql.stats; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.tez.common.counters.TezCounters; @@ -46,10 +43,11 @@ public CounterStatsAggregatorTez() { } @Override - public boolean connect(Configuration hconf, Task sourceTask) { + public boolean connect(StatsCollectionContext scc) { + Task sourceTask = scc.getTask(); if (!(sourceTask instanceof TezTask)) { delegate = true; - return mrAggregator.connect(hconf, sourceTask); + return mrAggregator.connect(scc); } counters = ((TezTask) sourceTask).getTezCounters(); return counters != null; @@ -75,7 +73,7 @@ public String aggregateStats(String keyPrefix, String statType) { } @Override - public boolean closeConnection() { + public boolean closeConnection(StatsCollectionContext scc) { return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java index bf7d027..e5f1400 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java @@ -22,7 +22,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.mapred.Reporter; @@ -33,12 +32,12 @@ private Reporter reporter; @Override - public boolean init(Configuration hconf) { + public boolean init(StatsCollectionContext context) { return true; } @Override - public boolean connect(Configuration hconf) { + public boolean connect(StatsCollectionContext statsContext) { MapredContext context = MapredContext.get(); if (context == null || context.getReporter() == null) { return false; @@ -61,7 +60,7 @@ public boolean publishStat(String fileID, Map stats) { return true; } @Override - public boolean closeConnection() { + public boolean closeConnection(StatsCollectionContext context) { return true; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java index 0ae0489..b115daf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hive.ql.stats; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.Task; - /** * An interface for any possible implementation for gathering statistics. */ @@ -35,7 +32,7 @@ * @param sourceTask * @return true if connection is successful, false otherwise. */ - public boolean connect(Configuration hconf, Task sourceTask); + public boolean connect(StatsCollectionContext scc); /** * This method aggregates a given statistic from all tasks (partial stats). @@ -65,7 +62,7 @@ * * @return true if close connection is successful, false otherwise. */ - public boolean closeConnection(); + public boolean closeConnection(StatsCollectionContext scc); /** * This method is called after all statistics have been aggregated. Since we support multiple diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionContext.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionContext.java new file mode 100644 index 0000000..ae6f2ac --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionContext.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.Task; + +public class StatsCollectionContext { + + private final Configuration hiveConf; + private Task task; + private List statsTmpDirs; + + public List getStatsTmpDirs() { + return statsTmpDirs; + } + + public void setStatsTmpDirs(List statsTmpDirs) { + this.statsTmpDirs = statsTmpDirs; + } + + public void setStatsTmpDir(String statsTmpDir) { + this.statsTmpDirs = statsTmpDir == null ? new ArrayList() : + Arrays.asList(new String[]{statsTmpDir}); + } + + public StatsCollectionContext(Configuration hiveConf) { + super(); + this.hiveConf = hiveConf; + } + + public Configuration getHiveConf() { + return hiveConf; + } + + public Task getTask() { + return task; + } + + public void setTask(Task task) { + this.task = task; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java index 845ec6a..3631b83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java @@ -20,8 +20,6 @@ import java.util.Map; -import org.apache.hadoop.conf.Configuration; - /** * An interface for any possible implementation for publishing statics. */ @@ -37,14 +35,14 @@ * intermediate stats database. * @return true if initialization is successful, false otherwise. */ - public boolean init(Configuration hconf); + public boolean init(StatsCollectionContext context); /** * This method connects to the intermediate statistics database. * @param hconf HiveConf that contains the connection parameters. * @return true if connection is successful, false otherwise. */ - public boolean connect(Configuration hconf); + public boolean connect(StatsCollectionContext context); /** * This method publishes a given statistic into a disk storage, possibly HBase or MySQL. @@ -66,6 +64,6 @@ /** * This method closes the connection to the temporary storage. */ - public boolean closeConnection(); + public boolean closeConnection(StatsCollectionContext context); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java index be025fb..6dfc178 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java @@ -26,15 +26,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import com.esotericsoftware.kryo.io.Input; @@ -44,17 +43,17 @@ private List>> statsList; private Map> statsMap; private FileSystem fs; - private Configuration conf; @Override - public boolean connect(Configuration hconf, Task sourceTask) { - conf = hconf; - Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC)); + public boolean connect(StatsCollectionContext scc) { + List statsDirs = scc.getStatsTmpDirs(); + assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs; + Path statsDir = new Path(statsDirs.get(0)); LOG.debug("About to read stats from : " + statsDir); statsMap = new HashMap>(); try { - fs = statsDir.getFileSystem(hconf); + fs = statsDir.getFileSystem(scc.getHiveConf()); statsList = new ArrayList>>(); FileStatus[] status = fs.listStatus(statsDir, new PathFilter() { @Override @@ -98,11 +97,15 @@ public String aggregateStats(String partID, String statType) { } @Override - public boolean closeConnection() { - LOG.debug("About to delete stats tmp dir"); + public boolean closeConnection(StatsCollectionContext scc) { + List statsDirs = scc.getStatsTmpDirs(); + assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs; + Path statsDir = new Path(statsDirs.get(0)); + + LOG.debug("About to delete stats tmp dir :" + statsDir); try { - fs.delete(new Path(conf.get(StatsSetupConst.STATS_TMP_LOC)),true); + fs.delete(statsDir,true); return true; } catch (IOException e) { LOG.error(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java index ce96064..aa2bf62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -30,6 +31,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import com.esotericsoftware.kryo.io.Output; @@ -41,12 +43,14 @@ private Map> statsMap; // map from partID -> (statType->value) @Override - public boolean init(Configuration hconf) { - Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC)); - LOG.debug("Initing FSStatsPublisher with : " + statsDir); + public boolean init(StatsCollectionContext context) { try { - statsDir.getFileSystem(hconf).mkdirs(statsDir); - LOG.info("created : " + statsDir); + for (String tmpDir : context.getStatsTmpDirs()) { + Path statsDir = new Path(tmpDir); + LOG.debug("Initing FSStatsPublisher with : " + statsDir); + statsDir.getFileSystem(context.getHiveConf()).mkdirs(statsDir); + LOG.info("created : " + statsDir); + } return true; } catch (IOException e) { LOG.error(e); @@ -55,9 +59,11 @@ public boolean init(Configuration hconf) { } @Override - public boolean connect(Configuration hconf) { - conf = hconf; - Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC)); + public boolean connect(StatsCollectionContext context) { + conf = context.getHiveConf(); + List statsDirs = context.getStatsTmpDirs(); + assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs; + Path statsDir = new Path(statsDirs.get(0)); LOG.debug("Connecting to : " + statsDir); statsMap = new HashMap>(); try { @@ -85,14 +91,16 @@ public boolean publishStat(String partKV, Map stats) { } @Override - public boolean closeConnection() { - Path statsDir = new Path(conf.get(StatsSetupConst.STATS_TMP_LOC)); + public boolean closeConnection(StatsCollectionContext context) { + List statsDirs = context.getStatsTmpDirs(); + assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs; + Path statsDir = new Path(statsDirs.get(0)); try { Path statsFile = new Path(statsDir,StatsSetupConst.STATS_FILE_PREFIX +conf.getInt("mapred.task.partition",0)); LOG.debug("About to create stats file for this task : " + statsFile); Output output = new Output(statsFile.getFileSystem(conf).create(statsFile,true)); - LOG.info("Created file : " + statsFile); - LOG.info("Writing stats in it : " + statsMap); + LOG.debug("Created file : " + statsFile); + LOG.debug("Writing stats in it : " + statsMap); Utilities.runtimeSerializationKryo.get().writeObject(output, statsMap); output.close(); return true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java index e92523e..d8c9926 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java @@ -34,16 +34,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; public class JDBCStatsAggregator implements StatsAggregator { private Connection conn; private String connectionString; private Configuration hiveconf; - private Task sourceTask; private final Map columnMapping; private final Log LOG = LogFactory.getLog(this.getClass().getName()); private int timeout = 30; @@ -58,8 +57,8 @@ public JDBCStatsAggregator() { } @Override - public boolean connect(Configuration hiveconf, Task sourceTask) { - this.hiveconf = hiveconf; + public boolean connect(StatsCollectionContext scc) { + this.hiveconf = scc.getHiveConf(); timeout = (int) HiveConf.getTimeVar( hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS); connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); @@ -67,7 +66,6 @@ public boolean connect(Configuration hiveconf, Task sourceTask) { maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); waitWindow = HiveConf.getTimeVar( hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS); - this.sourceTask = sourceTask; try { JavaUtils.loadClass(driver).newInstance(); @@ -159,14 +157,14 @@ public ResultSet run(PreparedStatement stmt) throws SQLException { return null; } // close the current connection - closeConnection(); + closeConnection(null); long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); try { Thread.sleep(waitTime); } catch (InterruptedException iex) { } // getting a new connection - if (!connect(hiveconf, sourceTask)) { + if (!connect(new StatsCollectionContext(hiveconf))) { // if cannot reconnect, just fail because connect() already handles retries. LOG.error("Error during publishing aggregation. " + e); return null; @@ -181,7 +179,7 @@ public ResultSet run(PreparedStatement stmt) throws SQLException { } @Override - public boolean closeConnection() { + public boolean closeConnection(StatsCollectionContext scc) { if (conn == null) { return true; @@ -238,14 +236,14 @@ public Void run(PreparedStatement stmt) throws SQLException { return false; } // close the current connection - closeConnection(); + closeConnection(null); long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); try { Thread.sleep(waitTime); } catch (InterruptedException iex) { } // getting a new connection - if (!connect(hiveconf, sourceTask)) { + if (!connect(new StatsCollectionContext(hiveconf))) { LOG.error("Error during clean-up. " + e); return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java index aeb3d27..0318a8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; public class JDBCStatsPublisher implements StatsPublisher { @@ -59,8 +60,8 @@ public JDBCStatsPublisher() { } @Override - public boolean connect(Configuration hiveconf) { - this.hiveconf = hiveconf; + public boolean connect(StatsCollectionContext context) { + this.hiveconf = context.getHiveConf(); maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); waitWindow = HiveConf.getTimeVar( hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS); @@ -209,15 +210,16 @@ private boolean handleSQLRecoverableException(Exception e, int failures) { if (failures >= maxRetries) { return false; } + StatsCollectionContext sCntxt = new StatsCollectionContext(hiveconf); // close the current connection - closeConnection(); + closeConnection(sCntxt); long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); try { Thread.sleep(waitTime); } catch (InterruptedException iex) { } // get a new connection - if (!connect(hiveconf)) { + if (!connect(sCntxt)) { // if cannot reconnect, just fail because connect() already handles retries. LOG.error("Error during publishing aggregation. " + e); return false; @@ -226,7 +228,7 @@ private boolean handleSQLRecoverableException(Exception e, int failures) { } @Override - public boolean closeConnection() { + public boolean closeConnection(StatsCollectionContext context) { if (conn == null) { return true; } @@ -266,13 +268,13 @@ public boolean closeConnection() { * creating tables.). */ @Override - public boolean init(Configuration hconf) { + public boolean init(StatsCollectionContext context) { Statement stmt = null; ResultSet rs = null; try { - this.hiveconf = hconf; - connectionString = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - String driver = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); + this.hiveconf = context.getHiveConf(); + connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); + String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); JavaUtils.loadClass(driver).newInstance(); synchronized(DriverManager.class) { DriverManager.setLoginTimeout(timeout); @@ -339,7 +341,7 @@ public boolean init(Configuration hconf) { // do nothing } } - closeConnection(); + closeConnection(context); } return true; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index 4594836..d22d022 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -198,8 +199,6 @@ public void testDeleteDynamicPartitioning() throws Exception { @Before public void setup() throws Exception { jc = new JobConf(); - jc.set(StatsSetupConst.STATS_TMP_LOC, File.createTempFile("TestFileSinkOperator", - "stats").getPath()); jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER.varname, TFSOStatsPublisher.class.getName()); jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR.varname, @@ -857,12 +856,12 @@ public SerDeStats getSerDeStats() { static Map stats; @Override - public boolean init(Configuration hconf) { + public boolean init(StatsCollectionContext context) { return true; } @Override - public boolean connect(Configuration hconf) { + public boolean connect(StatsCollectionContext context) { return true; } @@ -873,7 +872,7 @@ public boolean publishStat(String fileID, Map stats) { } @Override - public boolean closeConnection() { + public boolean closeConnection(StatsCollectionContext context) { return true; } } @@ -881,7 +880,7 @@ public boolean closeConnection() { public static class TFSOStatsAggregator implements StatsAggregator { @Override - public boolean connect(Configuration hconf, Task sourceTask) { + public boolean connect(StatsCollectionContext scc) { return true; } @@ -891,7 +890,7 @@ public String aggregateStats(String keyPrefix, String statType) { } @Override - public boolean closeConnection() { + public boolean closeConnection(StatsCollectionContext scc) { return true; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java index 887716e..c257797 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.mapred.JobConf; @@ -60,9 +61,10 @@ protected void setUp() { protected void tearDown() { StatsAggregator sa = factory.getStatsAggregator(); assertNotNull(sa); - assertTrue(sa.connect(conf, null)); + StatsCollectionContext sc = new StatsCollectionContext(conf); + assertTrue(sa.connect(sc)); assertTrue(sa.cleanUp("file_0")); - assertTrue(sa.closeConnection()); + assertTrue(sa.closeConnection(sc)); } private void fillStatMap(String numRows, String rawDataSize) { @@ -80,13 +82,14 @@ public void testStatsPublisherOneStat() throws Throwable { // instantiate stats publisher StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); assertNotNull(statsPublisher); - assertTrue(statsPublisher.init(conf)); - assertTrue(statsPublisher.connect(conf)); + StatsCollectionContext sc = new StatsCollectionContext(conf); + assertTrue(statsPublisher.init(sc)); + assertTrue(statsPublisher.connect(sc)); // instantiate stats aggregator StatsAggregator statsAggregator = factory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf, null)); + assertTrue(statsAggregator.connect(sc)); // publish stats fillStatMap("200", "1000"); @@ -109,8 +112,8 @@ public void testStatsPublisherOneStat() throws Throwable { assertEquals("3000", usize1); // close connections - assertTrue(statsPublisher.closeConnection()); - assertTrue(statsAggregator.closeConnection()); + assertTrue(statsPublisher.closeConnection(sc)); + assertTrue(statsAggregator.closeConnection(sc)); System.out .println("StatsPublisher - one stat published per key - aggregating matching key - OK"); @@ -128,13 +131,14 @@ public void testStatsPublisher() throws Throwable { StatsPublisher statsPublisher = Utilities.getStatsPublisher( (JobConf) conf); assertNotNull(statsPublisher); - assertTrue(statsPublisher.init(conf)); - assertTrue(statsPublisher.connect(conf)); + StatsCollectionContext sc = new StatsCollectionContext(conf); + assertTrue(statsPublisher.init(sc)); + assertTrue(statsPublisher.connect(sc)); // instantiate stats aggregator StatsAggregator statsAggregator = factory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf, null)); + assertTrue(statsAggregator.connect(sc)); // statsAggregator.cleanUp("file_0000"); // assertTrue(statsAggregator.connect(conf)); @@ -172,8 +176,8 @@ public void testStatsPublisher() throws Throwable { assertTrue(statsAggregator.cleanUp("file_0000")); // close connections - assertTrue(statsPublisher.closeConnection()); - assertTrue(statsAggregator.closeConnection()); + assertTrue(statsPublisher.closeConnection(sc)); + assertTrue(statsAggregator.closeConnection(sc)); System.out.println("StatsPublisher - basic functionality - OK"); } catch (Throwable e) { @@ -189,13 +193,14 @@ public void testStatsPublisherMultipleUpdates() throws Throwable { // instantiate stats publisher StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); assertNotNull(statsPublisher); - assertTrue(statsPublisher.init(conf)); - assertTrue(statsPublisher.connect(conf)); + StatsCollectionContext sc = new StatsCollectionContext(conf); + assertTrue(statsPublisher.init(sc)); + assertTrue(statsPublisher.connect(sc)); // instantiate stats aggregator StatsAggregator statsAggregator = factory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf, null)); + assertTrue(statsAggregator.connect(sc)); // publish stats fillStatMap("200", "1000"); @@ -236,8 +241,8 @@ public void testStatsPublisherMultipleUpdates() throws Throwable { assertTrue(statsAggregator.cleanUp("file_0000")); // close connections - assertTrue(statsPublisher.closeConnection()); - assertTrue(statsAggregator.closeConnection()); + assertTrue(statsPublisher.closeConnection(sc)); + assertTrue(statsAggregator.closeConnection(sc)); System.out.println("StatsPublisher - multiple updates - OK"); } catch (Throwable e) { @@ -254,13 +259,14 @@ public void testStatsPublisherMultipleUpdatesSubsetStatistics() throws Throwable // instantiate stats publisher StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); assertNotNull(statsPublisher); - assertTrue(statsPublisher.init(conf)); - assertTrue(statsPublisher.connect(conf)); + StatsCollectionContext sc = new StatsCollectionContext(conf); + assertTrue(statsPublisher.init(sc)); + assertTrue(statsPublisher.connect(sc)); // instantiate stats aggregator StatsAggregator statsAggregator = factory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf, null)); + assertTrue(statsAggregator.connect(sc)); // publish stats fillStatMap("200", ""); @@ -305,8 +311,8 @@ public void testStatsPublisherMultipleUpdatesSubsetStatistics() throws Throwable assertTrue(statsAggregator.cleanUp("file_0000")); // close connections - assertTrue(statsPublisher.closeConnection()); - assertTrue(statsAggregator.closeConnection()); + assertTrue(statsPublisher.closeConnection(sc)); + assertTrue(statsAggregator.closeConnection(sc)); System.out .println("StatsPublisher - (multiple updates + publishing subset of supported statistics) - OK"); @@ -325,13 +331,14 @@ public void testStatsAggregatorCleanUp() throws Throwable { // instantiate stats publisher StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); assertNotNull(statsPublisher); - assertTrue(statsPublisher.init(conf)); - assertTrue(statsPublisher.connect(conf)); + StatsCollectionContext sc = new StatsCollectionContext(conf); + assertTrue(statsPublisher.init(sc)); + assertTrue(statsPublisher.connect(sc)); // instantiate stats aggregator StatsAggregator statsAggregator = factory.getStatsAggregator(); assertNotNull(statsAggregator); - assertTrue(statsAggregator.connect(conf, null)); + assertTrue(statsAggregator.connect(sc)); // publish stats fillStatMap("200", "1000"); @@ -364,8 +371,8 @@ public void testStatsAggregatorCleanUp() throws Throwable { assertTrue(statsAggregator.cleanUp("file_0000")); // close connections - assertTrue(statsPublisher.closeConnection()); - assertTrue(statsAggregator.closeConnection()); + assertTrue(statsPublisher.closeConnection(sc)); + assertTrue(statsAggregator.closeConnection(sc)); System.out.println("StatsAggregator - clean-up - OK"); } catch (Throwable e) { diff --git a/ql/src/test/queries/clientpositive/infer_bucket_sort_multi_insert.q b/ql/src/test/queries/clientpositive/infer_bucket_sort_multi_insert.q index e3992b8..3341df0 100644 --- a/ql/src/test/queries/clientpositive/infer_bucket_sort_multi_insert.q +++ b/ql/src/test/queries/clientpositive/infer_bucket_sort_multi_insert.q @@ -1,5 +1,6 @@ set hive.exec.infer.bucket.sort=true; set hive.exec.infer.bucket.sort.num.buckets.power.two=true; +set hive.stats.dbclass=fs; -- This tests inferring how data is bucketed/sorted from the operators in the reducer -- and populating that information in partitions' metadata. In particular, those cases diff --git a/ql/src/test/queries/clientpositive/multi_insert.q b/ql/src/test/queries/clientpositive/multi_insert.q index 5947985..1fdfa59 100644 --- a/ql/src/test/queries/clientpositive/multi_insert.q +++ b/ql/src/test/queries/clientpositive/multi_insert.q @@ -5,7 +5,7 @@ create table src_multi2 like src; set hive.merge.mapfiles=false; set hive.merge.mapredfiles=false; - +set hive.stats.dbclass=fs; explain from src insert overwrite table src_multi1 select * where key < 10 diff --git a/ql/src/test/queries/clientpositive/multi_insert_gby2.q b/ql/src/test/queries/clientpositive/multi_insert_gby2.q index 46e2b19..fa29261 100644 --- a/ql/src/test/queries/clientpositive/multi_insert_gby2.q +++ b/ql/src/test/queries/clientpositive/multi_insert_gby2.q @@ -1,7 +1,7 @@ --HIVE-3699 Multiple insert overwrite into multiple tables query stores same results in all tables create table e1 (count int); create table e2 (percentile double); - +set hive.stats.dbclass=fs; explain FROM (select key, cast(key as double) as value from src order by key) a INSERT OVERWRITE TABLE e1 diff --git a/ql/src/test/queries/clientpositive/multi_insert_gby3.q b/ql/src/test/queries/clientpositive/multi_insert_gby3.q index 1221af4..d85ff9a 100644 --- a/ql/src/test/queries/clientpositive/multi_insert_gby3.q +++ b/ql/src/test/queries/clientpositive/multi_insert_gby3.q @@ -2,7 +2,7 @@ create table e1 (key string, keyD double); create table e2 (key string, keyD double, value string); create table e3 (key string, keyD double); - +set hive.stats.dbclass=fs; explain FROM (select key, cast(key as double) as keyD, value from src order by key) a INSERT OVERWRITE TABLE e1 diff --git a/ql/src/test/queries/clientpositive/multi_insert_lateral_view.q b/ql/src/test/queries/clientpositive/multi_insert_lateral_view.q index acf905f..d80717f 100644 --- a/ql/src/test/queries/clientpositive/multi_insert_lateral_view.q +++ b/ql/src/test/queries/clientpositive/multi_insert_lateral_view.q @@ -1,3 +1,4 @@ +set hive.stats.dbclass=fs; -- SORT_QUERY_RESULTS create table src_10 as select * from src limit 10; diff --git a/ql/src/test/queries/clientpositive/multi_insert_mixed.q b/ql/src/test/queries/clientpositive/multi_insert_mixed.q index 6d91973..8fb577a 100644 --- a/ql/src/test/queries/clientpositive/multi_insert_mixed.q +++ b/ql/src/test/queries/clientpositive/multi_insert_mixed.q @@ -1,7 +1,7 @@ create table src_multi1 like src; create table src_multi2 like src; create table src_multi3 like src; - +set hive.stats.dbclass=fs; -- Testing the case where a map work contains both shuffling (ReduceSinkOperator) -- and inserting to output table (FileSinkOperator). diff --git a/ql/src/test/queries/clientpositive/multi_insert_move_tasks_share_dependencies.q b/ql/src/test/queries/clientpositive/multi_insert_move_tasks_share_dependencies.q index 3117713..3ddaa47 100644 --- a/ql/src/test/queries/clientpositive/multi_insert_move_tasks_share_dependencies.q +++ b/ql/src/test/queries/clientpositive/multi_insert_move_tasks_share_dependencies.q @@ -1,5 +1,5 @@ set hive.multi.insert.move.tasks.share.dependencies=true; - +set hive.stats.dbclass=fs; -- SORT_QUERY_RESULTS create table src_multi1 like src; diff --git a/ql/src/test/queries/clientpositive/multi_insert_union_src.q b/ql/src/test/queries/clientpositive/multi_insert_union_src.q index 088d756..f9b6f87 100644 --- a/ql/src/test/queries/clientpositive/multi_insert_union_src.q +++ b/ql/src/test/queries/clientpositive/multi_insert_union_src.q @@ -1,7 +1,7 @@ drop table if exists src2; drop table if exists src_multi1; drop table if exists src_multi1; - +set hive.stats.dbclass=fs; CREATE TABLE src2 as SELECT * FROM src; create table src_multi1 like src; -- 1.7.12.4 (Apple Git-37)