From a7c73a04241e70b5915d576d8441305ab9c2ae08 Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Thu, 21 Jan 2016 14:07:19 -0800 Subject: [PATCH] HIVE-12897 : Improve dynamic partition loading --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 17 +++++++++-------- .../apache/hadoop/hive/metastore/MetaStoreUtils.java | 2 +- .../apache/hadoop/hive/metastore/ObjectStore.java | 7 +++++-- .../apache/hadoop/hive/ql/exec/FileSinkOperator.java | 8 ++++++++ .../org/apache/hadoop/hive/ql/metadata/Hive.java | 14 ++++---------- .../index/RewriteParseContextGenerator.java | 4 ++-- .../ql/parse/ExplainSQRewriteSemanticAnalyzer.java | 2 -- .../hive/ql/parse/ExplainSemanticAnalyzer.java | 2 +- .../hadoop/hive/ql/plan/DynamicPartitionCtx.java | 20 +++++++++++++++++++- .../dynamic_partitions_with_whitelist.q.out | 4 ++-- 10 files changed, 51 insertions(+), 29 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 97fe7bc..74a8749 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -210,7 +210,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { public static final HiveConf.ConfVars[] metaConfVars = { HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL, HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL_DDL, - HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT + HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, + HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN }; static { @@ -1574,32 +1575,32 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000, "Number of aborted transactions involving a given table or partition that will trigger\n" + "a major compaction."), - + COMPACTOR_INITIATOR_FAILED_THRESHOLD("hive.compactor.initiator.failed.compacts.threshold", 2, new RangeValidator(1, 20), "Number of consecutive compaction failures (per table/partition) " + "after which automatic compactions will not be scheduled any more. Note that this must be less " + "than hive.compactor.history.retention.failed."), - + HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms", new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"), COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), - + COMPACTOR_HISTORY_RETENTION_SUCCEEDED("hive.compactor.history.retention.succeeded", 3, new RangeValidator(0, 100), "Determines how many successful compaction records will be " + "retained in compaction history for a given table/partition."), - + COMPACTOR_HISTORY_RETENTION_FAILED("hive.compactor.history.retention.failed", 3, new RangeValidator(0, 100), "Determines how many failed compaction records will be " + "retained in compaction history for a given table/partition."), - + COMPACTOR_HISTORY_RETENTION_ATTEMPTED("hive.compactor.history.retention.attempted", 2, new RangeValidator(0, 100), "Determines how many attempted compaction records will be " + "retained in compaction history for a given table/partition."), - + COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m", new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"), - + HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s", new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"), HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s", diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index eee7f1b..c8859f3 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1618,7 +1618,7 @@ public static boolean compareFieldColumns(List schema1, List partVals, + public static String getPartitionValWithInvalidCharacter(List partVals, Pattern partitionValidationPattern) { if (partitionValidationPattern == null) { return null; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index e044c73..b808728 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -290,7 +290,7 @@ public void setConf(Configuration conf) { String partitionValidationRegex = hiveConf.get(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.name()); - if (partitionValidationRegex != null && partitionValidationRegex.equals("")) { + if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { partitionValidationPattern = Pattern.compile(partitionValidationRegex); } else { partitionValidationPattern = null; @@ -759,7 +759,7 @@ public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaExc String queryStr = "select name from org.apache.hadoop.hive.metastore.model.MDatabase"; Query query = null; - + openTransaction(); try { query = pm.newQuery(queryStr); @@ -1054,14 +1054,17 @@ public Table getTable(String dbName, String tableName) throws MetaException { return tbls; } + @Override public int getDatabaseCount() throws MetaException { return getObjectCount("name", MDatabase.class.getName()); } + @Override public int getPartitionCount() throws MetaException { return getObjectCount("partitionName", MPartition.class.getName()); } + @Override public int getTableCount() throws MetaException { return getObjectCount("tableName", MTable.class.getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 3289cfc..14121b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -696,6 +697,13 @@ public void process(Object row, int tag) throws HiveException { } } + String invalidPartitionVal; + if((invalidPartitionVal = MetaStoreUtils.getPartitionValWithInvalidCharacter(dpVals, dpCtx.getWhiteListPattern()))!=null) { + throw new HiveFatalException("Partition value '" + invalidPartitionVal + + "' contains a character not matched by whitelist pattern '" + + dpCtx.getWhiteListPattern().toString() + "'. " + "(configure with " + + HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname + ")"); + } fpaths = getDynOutPaths(dpVals, lbDirName); // use SubStructObjectInspector to serialize the non-partitioning columns in the input row diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index efb50b2..50681c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1427,7 +1427,7 @@ public void loadPartition(Path loadPath, String tableName, * @param isSrcLocal * If the source directory is LOCAL * @param isAcid true if this is an ACID operation - * @throws JSONException + * @throws JSONException */ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, @@ -1622,7 +1622,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param txnId txnId, can be 0 unless isAcid == true * @return partition map details (PartitionSpec and Partition) * @throws HiveException - * @throws JSONException + * @throws JSONException */ public Map, Partition> loadDynamicPartitions(Path loadPath, String tableName, Map partSpec, boolean replace, @@ -1635,16 +1635,10 @@ private void constructOneLBLocationMap(FileStatus fSta, LinkedHashMap, Partition>(); FileSystem fs = loadPath.getFileSystem(conf); - FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs); + FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); // Check for empty partitions for (FileStatus s : leafStatus) { - try { - validatePartitionNameCharacters( - Warehouse.getPartValuesFromPartName(s.getPath().getParent().toString())); - } catch (MetaException e) { - throw new HiveException(e); - } - validPartitions.add(s.getPath().getParent()); + validPartitions.add(s.getPath()); } int partsToLoad = validPartitions.size(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java index 48105de..64f9734 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java @@ -102,9 +102,9 @@ ASTNode ast, Context ctx) throws SemanticException { QB qb = new QB(null, null, false); ASTNode child = ast; - ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + ParseContext subPCtx = sem.getParseContext(); subPCtx.setContext(ctx); - ((SemanticAnalyzer) sem).initParseCtx(subPCtx); + sem.initParseCtx(subPCtx); LOG.info("Starting Sub-query Semantic Analysis"); sem.doPhase1(child, qb, sem.initPhase1Ctx(), null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java index 2c2339a..6f0f3a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java @@ -17,13 +17,11 @@ */ package org.apache.hadoop.hive.ql.parse; -import java.io.Serializable; import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ExplainSQRewriteTask; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index c1e9ec1..e1e3eb2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -79,7 +79,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { if (tasks == null) { tasks = Collections.emptyList(); } - + FetchTask fetchTask = sem.getFetchTask(); if (fetchTask != null) { // Initialize fetch work such that operator tree will be constructed. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 95d5635..e6ec3ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -21,10 +21,15 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.SemanticException; public class DynamicPartitionCtx implements Serializable { @@ -44,12 +49,13 @@ private List dpNames; // dp column names private String defaultPartName; // default partition name in case of null or empty value private int maxPartsPerNode; // maximum dynamic partitions created per mapper/reducer + private Pattern whiteListPattern; public DynamicPartitionCtx() { } public DynamicPartitionCtx(Table tbl, Map partSpec, String defaultPartName, - int maxParts) { + int maxParts) throws SemanticException { this.partSpec = partSpec; this.spNames = new ArrayList(); this.dpNames = new ArrayList(); @@ -71,6 +77,13 @@ public DynamicPartitionCtx(Table tbl, Map partSpec, String defau } else { this.spPath = null; } + String confVal; + try { + confVal = Hive.get().getMetaConf(ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname); + } catch (HiveException e) { + throw new SemanticException(e); + } + this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); } public DynamicPartitionCtx(DynamicPartitionCtx dp) { @@ -84,6 +97,11 @@ public DynamicPartitionCtx(DynamicPartitionCtx dp) { this.dpNames = dp.dpNames; this.defaultPartName = dp.defaultPartName; this.maxPartsPerNode = dp.maxPartsPerNode; + this.whiteListPattern = dp.whiteListPattern; + } + + public Pattern getWhiteListPattern() { + return whiteListPattern; } public int getMaxPartitionsPerNode() { diff --git a/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out b/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out index f069ae8..654d892 100644 --- a/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out +++ b/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out @@ -32,5 +32,5 @@ PREHOOK: type: QUERY PREHOOK: Input: default@source_table PREHOOK: Input: default@source_table@ds=2008-04-08/hr=11 PREHOOK: Output: default@dest_table -Failed with exception MetaException(message:Partition value 'val_129' contains a character not matched by whitelist pattern '[^9]*'. (configure with hive.metastore.partition.name.whitelist.pattern)) -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask +#### A masked pattern was here #### +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask -- 1.7.12.4 (Apple Git-37)