From 37260e1a4ee58ccb6d301813b4ce1e30525fed0f Mon Sep 17 00:00:00 2001 From: Narayanan Venkateswaran Date: Wed, 2 Jun 2021 11:04:54 +0530 Subject: [PATCH] HIVE-25178 : Reduce number of getPartition calls during loadDynamicPartitions for External Tables --- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../apache/hadoop/hive/ql/metadata/Hive.java | 55 ++++++++---- .../clientpositive/load_dynamic_partition.q | 13 +++ .../llap/load_dynamic_partition.q.out | 84 +++++++++++++++++++ .../hadoop/hive/metastore/Warehouse.java | 81 +++++++++++++----- 5 files changed, 200 insertions(+), 35 deletions(-) create mode 100644 ql/src/test/queries/clientpositive/load_dynamic_partition.q create mode 100644 ql/src/test/results/clientpositive/llap/load_dynamic_partition.q.out diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1d404e0b0d..27f1778e9a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4239,6 +4239,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT("hive.load.dynamic.partitions.thread", 15, new SizeValidator(1L, true, 1024L, true), "Number of threads used to load dynamic partitions."), + HIVE_LOAD_DYNAMIC_PARTITIONS_SCAN_SPECIFIC_PARTITIONS("hive.load.dynamic.partitions.scan.specific.partitions", false, + "For the dynamic partitioned tables, scan only the specific partitions using the name from the list"), // If this is set all move tasks at the end of a multi-insert query will only begin once all // outputs are ready HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 6e3551b4e2..c8a352b520 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2953,21 +2953,46 @@ private void constructOneLBLocationMap(FileStatus fSta, final SessionState parentSession = SessionState.get(); List> tasks = Lists.newLinkedList(); - // fetch all the partitions matching the part spec using the partition iterable - // this way the maximum batch size configuration parameter is considered - PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec, - conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300)); - Iterator iterator = partitionIterable.iterator(); + final boolean scanPartitionsByName = conf.getBoolean( + ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_SCAN_SPECIFIC_PARTITIONS.varname, false); + + if (scanPartitionsByName && !tbd.isDirectInsert() && !AcidUtils.isTransactionalTable(tbl)) { + //TODO: Need to create separate ticket for ACID table; ACID table can be a bigger change. + //Fetch only relevant partitions from HMS for checking old partitions + List partitionNames = new LinkedList<>(); + for(PartitionDetails details : partitionDetailsMap.values()) { + if (details.fullSpec != null && !details.fullSpec.isEmpty()) { + partitionNames.add(Warehouse.makeDynamicPartNameNoTrailingSeperator(details.fullSpec)); + } + } + List partitions = Hive.get().getPartitionsByNames(tbl, partitionNames); + for(Partition partition : partitions) { + LOG.info("HMS partition spec: {}", partition.getSpec()); + partitionDetailsMap.entrySet().parallelStream() + .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec())) + .findAny().ifPresent(entry -> { + entry.getValue().partition = partition; + entry.getValue().hasOldPartition = true; + }); + } + } else { - // Match valid partition path to partitions - while (iterator.hasNext()) { - Partition partition = iterator.next(); - partitionDetailsMap.entrySet().stream() - .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec())) - .findAny().ifPresent(entry -> { - entry.getValue().partition = partition; - entry.getValue().hasOldPartition = true; - }); + // fetch all the partitions matching the part spec using the partition iterable + // this way the maximum batch size configuration parameter is considered + PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec, + conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300)); + Iterator iterator = partitionIterable.iterator(); + + // Match valid partition path to partitions + while (iterator.hasNext()) { + Partition partition = iterator.next(); + partitionDetailsMap.entrySet().parallelStream() + .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec())) + .findAny().ifPresent(entry -> { + entry.getValue().partition = partition; + entry.getValue().hasOldPartition = true; + }); + } } boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); @@ -3042,7 +3067,7 @@ private void constructOneLBLocationMap(FileStatus fSta, Map, Partition> result = Maps.newLinkedHashMap(); try { futures = executor.invokeAll(tasks); - LOG.debug("Number of partitionsToAdd to be added is " + futures.size()); + LOG.info("Number of partitionsToAdd to be added is " + futures.size()); for (Future future : futures) { Partition partition = future.get(); result.put(partition.getSpec(), partition); diff --git a/ql/src/test/queries/clientpositive/load_dynamic_partition.q b/ql/src/test/queries/clientpositive/load_dynamic_partition.q new file mode 100644 index 0000000000..0dc0552edf --- /dev/null +++ b/ql/src/test/queries/clientpositive/load_dynamic_partition.q @@ -0,0 +1,13 @@ +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.load.dynamic.partitions.scan.specific.partitions=true; + +CREATE EXTERNAL TABLE test_part_ext_1(i int) PARTITIONED BY (j string, k int); + +CREATE TABLE test_part(i int,j string); + +insert into test_part values(1, "test1"), (2, "test2"), (3, "test3"), +(4, "test4"), (5, "test5"), (6, "test6"), (7, "test7"), (8, "test8"), (9, "test9"), (10, "test10"); + +insert overwrite table test_part_ext_1 partition(j, k) select i,j,i from test_part; + +insert overwrite table test_part_ext_1 partition(j, k) select i,j,i from test_part; diff --git a/ql/src/test/results/clientpositive/llap/load_dynamic_partition.q.out b/ql/src/test/results/clientpositive/llap/load_dynamic_partition.q.out new file mode 100644 index 0000000000..d0de854c60 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/load_dynamic_partition.q.out @@ -0,0 +1,84 @@ +PREHOOK: query: CREATE EXTERNAL TABLE test_part_ext_1(i int) PARTITIONED BY (j string, k int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_part_ext_1 +POSTHOOK: query: CREATE EXTERNAL TABLE test_part_ext_1(i int) PARTITIONED BY (j string, k int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_part_ext_1 +PREHOOK: query: CREATE TABLE test_part(i int,j string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_part +POSTHOOK: query: CREATE TABLE test_part(i int,j string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_part +PREHOOK: query: insert into test_part values(1, "test1"), (2, "test2"), (3, "test3"), +(4, "test4"), (5, "test5"), (6, "test6"), (7, "test7"), (8, "test8"), (9, "test9"), (10, "test10") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_part +POSTHOOK: query: insert into test_part values(1, "test1"), (2, "test2"), (3, "test3"), +(4, "test4"), (5, "test5"), (6, "test6"), (7, "test7"), (8, "test8"), (9, "test9"), (10, "test10") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_part +POSTHOOK: Lineage: test_part.i SCRIPT [] +POSTHOOK: Lineage: test_part.j SCRIPT [] +PREHOOK: query: insert overwrite table test_part_ext_1 partition(j, k) select i,j,i from test_part +PREHOOK: type: QUERY +PREHOOK: Input: default@test_part +PREHOOK: Output: default@test_part_ext_1 +POSTHOOK: query: insert overwrite table test_part_ext_1 partition(j, k) select i,j,i from test_part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_part +POSTHOOK: Output: default@test_part_ext_1 +POSTHOOK: Output: default@test_part_ext_1@j=test1/k=1 +POSTHOOK: Output: default@test_part_ext_1@j=test10/k=10 +POSTHOOK: Output: default@test_part_ext_1@j=test2/k=2 +POSTHOOK: Output: default@test_part_ext_1@j=test3/k=3 +POSTHOOK: Output: default@test_part_ext_1@j=test4/k=4 +POSTHOOK: Output: default@test_part_ext_1@j=test5/k=5 +POSTHOOK: Output: default@test_part_ext_1@j=test6/k=6 +POSTHOOK: Output: default@test_part_ext_1@j=test7/k=7 +POSTHOOK: Output: default@test_part_ext_1@j=test8/k=8 +POSTHOOK: Output: default@test_part_ext_1@j=test9/k=9 +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test1,k=1).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test10,k=10).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test2,k=2).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test3,k=3).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test4,k=4).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test5,k=5).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test6,k=6).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test7,k=7).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test8,k=8).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test9,k=9).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +PREHOOK: query: insert overwrite table test_part_ext_1 partition(j, k) select i,j,i from test_part +PREHOOK: type: QUERY +PREHOOK: Input: default@test_part +PREHOOK: Output: default@test_part_ext_1 +POSTHOOK: query: insert overwrite table test_part_ext_1 partition(j, k) select i,j,i from test_part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_part +POSTHOOK: Output: default@test_part_ext_1 +POSTHOOK: Output: default@test_part_ext_1@j=test1/k=1 +POSTHOOK: Output: default@test_part_ext_1@j=test10/k=10 +POSTHOOK: Output: default@test_part_ext_1@j=test2/k=2 +POSTHOOK: Output: default@test_part_ext_1@j=test3/k=3 +POSTHOOK: Output: default@test_part_ext_1@j=test4/k=4 +POSTHOOK: Output: default@test_part_ext_1@j=test5/k=5 +POSTHOOK: Output: default@test_part_ext_1@j=test6/k=6 +POSTHOOK: Output: default@test_part_ext_1@j=test7/k=7 +POSTHOOK: Output: default@test_part_ext_1@j=test8/k=8 +POSTHOOK: Output: default@test_part_ext_1@j=test9/k=9 +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test1,k=1).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test10,k=10).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test2,k=2).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test3,k=3).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test4,k=4).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test5,k=5).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test6,k=6).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test7,k=7).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test8,k=8).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: test_part_ext_1 PARTITION(j=test9,k=9).i SIMPLE [(test_part)test_part.FieldSchema(name:i, type:int, comment:null), ] diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index e617a7d5bb..97df31664e 100755 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -537,21 +537,28 @@ public static String makePartPath(Map spec) /** * Makes a partition name from a specification - * @param spec - * @param addTrailingSeperator if true, adds a trailing separator e.g. 'ds=1/' + * @param spec The partition specification, key and value pairs. + * @param addTrailingSeperator If true, adds a trailing separator e.g. 'ds=1/'. + * @param dynamic If true, create a dynamic partition name. * @return partition name * @throws MetaException */ - public static String makePartName(Map spec, - boolean addTrailingSeperator) - throws MetaException { + public static String makePartNameUtil(Map spec, boolean addTrailingSeperator, boolean dynamic) + throws MetaException { StringBuilder suffixBuf = new StringBuilder(); int i = 0; for (Entry e : spec.entrySet()) { + // Throw an exception if it is not a dynamic partition. if (e.getValue() == null || e.getValue().length() == 0) { - throw new MetaException("Partition spec is incorrect. " + spec); + if (dynamic) { + break; + } + else { + throw new MetaException("Partition spec is incorrect. " + spec); + } } - if (i>0) { + + if (i > 0) { suffixBuf.append(Path.SEPARATOR); } suffixBuf.append(escapePathName(e.getKey())); @@ -559,31 +566,65 @@ public static String makePartName(Map spec, suffixBuf.append(escapePathName(e.getValue())); i++; } - if (addTrailingSeperator) { + + if (addTrailingSeperator && i > 0) { suffixBuf.append(Path.SEPARATOR); } + return suffixBuf.toString(); } + + /** + * Makes a partition name from a specification + * @param spec + * @param addTrailingSeperator if true, adds a trailing separator e.g. 'ds=1/' + * @return partition name + * @throws MetaException + */ + public static String makePartName(Map spec, + boolean addTrailingSeperator) + throws MetaException { + return makePartNameUtil(spec, addTrailingSeperator, false); + } + /** * Given a dynamic partition specification, return the path corresponding to the - * static part of partition specification. This is basically a copy of makePartName + * static part of partition specification. This is basically similar to makePartName * but we get rid of MetaException since it is not serializable. * @param spec * @return string representation of the static part of the partition specification. */ public static String makeDynamicPartName(Map spec) { - StringBuilder suffixBuf = new StringBuilder(); - for (Entry e : spec.entrySet()) { - if (e.getValue() != null && e.getValue().length() > 0) { - suffixBuf.append(escapePathName(e.getKey())); - suffixBuf.append('='); - suffixBuf.append(escapePathName(e.getValue())); - suffixBuf.append(Path.SEPARATOR); - } else { // stop once we see a dynamic partition - break; - } + String partName = null; + try { + partName = makePartNameUtil(spec, true, true); } - return suffixBuf.toString(); + catch (MetaException e) { + // This exception is not thrown when dynamic=true. This is a Noop and + // can be ignored. + } + return partName; + } + + /** + * Given a dynamic partition specification, return the path corresponding to the + * static part of partition specification. This is basically similar to makePartName + * but we get rid of MetaException since it is not serializable. This method skips + * the trailing path seperator also. + * + * @param spec + * @return string representation of the static part of the partition specification. + */ + public static String makeDynamicPartNameNoTrailingSeperator(Map spec) { + String partName = null; + try { + partName = makePartNameUtil(spec, false, true); + } + catch (MetaException e) { + // This exception is not thrown when dynamic=true. This is a Noop and + // can be ignored. + } + return partName; } static final Pattern pat = Pattern.compile("([^/]+)=([^/]+)"); -- 2.24.3 (Apple Git-128)