From fa71dd96966147c033c5c4ec72bbeae17a4287a3 Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Tue, 25 Aug 2015 13:26:55 -0700 Subject: [PATCH] HIVE-11645 : Add in-place updates for dynamic partitions loading --- .../org/apache/hadoop/hive/ql/exec/MoveTask.java | 3 +- .../org/apache/hadoop/hive/ql/exec/StatsTask.java | 4 +- .../hadoop/hive/ql/exec/tez/InPlaceUpdates.java | 65 +++++++++++++++++++++ .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 66 ++-------------------- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 24 ++++++-- 5 files changed, 92 insertions(+), 70 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 50c4a96..7b4079f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -425,8 +425,7 @@ public int execute(DriverContext driverContext) { SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } - - console.printInfo("\tLoading partition " + entry.getKey()); + LOG.info("\tLoading partition " + entry.getKey()); } console.printInfo("\t Time taken for adding to write entity : " + (System.currentTimeMillis() - startTime)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 2a8167a..b0aa05c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -183,7 +183,7 @@ private int aggregateStats() { db.alterTable(tableFullName, new Table(tTable)); - console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']'); + LOG.info("Table " + tableFullName + " stats: [" + toString(parameters) + ']'); } else { // Partitioned table: // Need to get the old stats of the partition @@ -215,7 +215,7 @@ private int aggregateStats() { parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE); updates.add(new Partition(table, tPart)); - console.printInfo("Partition " + tableFullName + partn.getSpec() + + LOG.info("Partition " + tableFullName + partn.getSpec() + " stats: [" + toString(parameters) + ']'); } if (!updates.isEmpty()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java new file mode 100644 index 0000000..6ecfe71 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.fusesource.jansi.Ansi.ansi; +import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO; +import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO; +import static org.fusesource.jansi.internal.CLibrary.isatty; + +import java.io.PrintStream; + +import jline.TerminalFactory; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.fusesource.jansi.Ansi; + +public class InPlaceUpdates { + + private static final int MIN_TERMINAL_WIDTH = 80; + + static boolean isUnixTerminal() { + + String os = System.getProperty("os.name"); + if (os.startsWith("Windows")) { + // we do not support Windows, we will revisit this if we really need it for windows. + return false; + } + + // We must be on some unix variant.. + // check if standard out is a terminal + try { + // isatty system call will return 1 if the file descriptor is terminal else 0 + if (isatty(STDOUT_FILENO) == 0) { + return false; + } + if (isatty(STDERR_FILENO) == 0) { + return false; + } + } catch (NoClassDefFoundError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } catch (UnsatisfiedLinkError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } + return true; + } + + public static boolean inPlaceEligible(HiveConf conf) { + boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); + + // we need at least 80 chars wide terminal to display in-place updates properly + return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal() + && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH; + } + + public static void reprintLine(PrintStream out, String line) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + } + + public static void rePositionCursor(PrintStream ps) { + ps.print(ansi().cursorUp(0).toString()); + ps.flush(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 1a4decf..1e1603b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -20,9 +20,6 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; import static org.fusesource.jansi.Ansi.ansi; -import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO; -import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO; -import static org.fusesource.jansi.internal.CLibrary.isatty; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -74,7 +71,7 @@ public class TezJobMonitor { private static final String CLASS_NAME = TezJobMonitor.class.getName(); - private static final int MIN_TERMINAL_WIDTH = 80; + private static final int COLUMN_1_WIDTH = 16; private static final int SEPARATOR_WIDTH = 80; @@ -156,42 +153,13 @@ public TezJobMonitor() { } } - private static boolean isUnixTerminal() { - - String os = System.getProperty("os.name"); - if (os.startsWith("Windows")) { - // we do not support Windows, we will revisit this if we really need it for windows. - return false; - } - - // We must be on some unix variant.. - // check if standard out is a terminal - try { - // isatty system call will return 1 if the file descriptor is terminal else 0 - if (isatty(STDOUT_FILENO) == 0) { - return false; - } - if (isatty(STDERR_FILENO) == 0) { - return false; - } - } catch (NoClassDefFoundError ignore) { - // These errors happen if the JNI lib is not available for your platform. - return false; - } catch (UnsatisfiedLinkError ignore) { - // These errors happen if the JNI lib is not available for your platform. - return false; - } - return true; - } - /** * NOTE: Use this method only if isUnixTerminal is true. * Erases the current line and prints the given line. * @param line - line to print */ public void reprintLine(String line) { - out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); - out.flush(); + InPlaceUpdates.reprintLine(out, line); lines++; } @@ -234,15 +202,6 @@ public void repositionCursor() { } /** - * NOTE: Use this method only if isUnixTerminal is true. - * Gets the width of the terminal - * @return - width of terminal - */ - public int getTerminalWidth() { - return TerminalFactory.get().getWidth(); - } - - /** * monitorExecution handles status printing, failures during execution and final status retrieval. * * @param dagClient client that was used to kick off the job @@ -265,26 +224,11 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Hi Set opts = new HashSet(); Heartbeater heartbeater = new Heartbeater(txnMgr, conf); long startTime = 0; - boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || + boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(conf); - boolean inPlaceUpdates = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); - boolean wideTerminal = false; - boolean isTerminal = inPlaceUpdates == true ? isUnixTerminal() : false; - - // we need at least 80 chars wide terminal to display in-place updates properly - if (isTerminal) { - if (getTerminalWidth() >= MIN_TERMINAL_WIDTH) { - wideTerminal = true; - } - } - - boolean inPlaceEligible = false; - if (inPlaceUpdates && isTerminal && wideTerminal && !console.getIsSilent()) { - inPlaceEligible = true; - } + boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf); shutdownList.add(dagClient); - console.printInfo("\n"); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); @@ -470,7 +414,7 @@ private void printDagSummary(Map progressMap, LogHelper consol DAGClient dagClient, HiveConf conf, DAG dag) { /* Strings for headers and counters */ - String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); + String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); TezCounters hiveCounters = null; try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 396c070..983baa8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.metadata; import com.google.common.collect.Sets; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -81,6 +82,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.FunctionUtils; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; @@ -101,6 +103,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -123,7 +126,6 @@ import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; - /** * This class has functions that implement meta data/DDL operations using calls * to the metastore. @@ -1567,22 +1569,30 @@ private void constructOneLBLocationMap(FileStatus fSta, } } - if (validPartitions.size() == 0) { + int partsToLoad = validPartitions.size(); + if (partsToLoad == 0) { LOG.warn("No partition is generated by dynamic partitioning"); } - if (validPartitions.size() > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { - throw new HiveException("Number of dynamic partitions created is " + validPartitions.size() + if (partsToLoad > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { + throw new HiveException("Number of dynamic partitions created is " + partsToLoad + ", which is more than " + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS) +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + " to at least " + validPartitions.size() + '.'); + + " to at least " + partsToLoad + '.'); } Table tbl = getTable(tableName); // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that Iterator iter = validPartitions.iterator(); + LOG.info("Going to load " + partsToLoad + " partitions."); + PrintStream ps = null; + boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf); + if(inPlaceEligible) { + ps = SessionState.getConsole().getInfoStream(); + } + int partitionsLoaded = 0; while (iter.hasNext()) { // get the dynamically created directory Path partPath = iter.next(); @@ -1595,6 +1605,10 @@ private void constructOneLBLocationMap(FileStatus fSta, Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, holdDDLTime, true, listBucketingEnabled, false, isAcid); partitionsMap.put(fullPartSpec, newPartition); + if (inPlaceEligible) { + InPlaceUpdates.rePositionCursor(ps); + InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions."); + } LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); } if (isAcid) { -- 1.7.12.4 (Apple Git-37)