From d1bd5cf4996c356b525b2946978eb44157c15cd4 Mon Sep 17 00:00:00 2001 From: Guang Yang Date: Fri, 2 Nov 2018 16:21:35 -0700 Subject: [PATCH] HIVE-16839: Fix a race condidtion during concurrent partition drops --- .../apache/hadoop/hive/metastore/ObjectStore.java | 51 +++++++++------- .../hadoop/hive/metastore/TestObjectStore.java | 67 ++++++++++++++++++++++ 2 files changed, 96 insertions(+), 22 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 03e3a2d257..8ba2fbcd85 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2440,34 +2440,41 @@ public Partition getPartition(String catName, String dbName, String tableName, List part_vals, String validWriteIds) throws NoSuchObjectException, MetaException { - openTransaction(); - MTable table = this.getMTable(catName, dbName, tableName); - MPartition mpart = getMPartition(catName, dbName, tableName, part_vals); - Partition part = convertToPart(mpart); - commitTransaction(); - if(part == null) { - throw new NoSuchObjectException("partition values=" + Partition part = null; + boolean committed = false; + try { + openTransaction(); + MTable table = this.getMTable(catName, dbName, tableName); + MPartition mpart = getMPartition(catName, dbName, tableName, part_vals); + part = convertToPart(mpart); + committed = commitTransaction(); + if (part == null) { + throw new NoSuchObjectException("partition values=" + part_vals.toString()); - } - part.setValues(part_vals); - // If transactional table partition, check whether the current version partition - // statistics in the metastore comply with the client query's snapshot isolation. - long statsWriteId = mpart.getWriteId(); - if (TxnUtils.isTransactionalTable(table.getParameters())) { - if (!areTxnStatsSupported) { - // Do not make persistent the following state since it is query specific (not global). - StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); - } else if (validWriteIds != null) { - if (isCurrentStatsValidForTheQuery(part, statsWriteId, validWriteIds, false)) { - part.setIsStatsCompliant(true); - } else { - part.setIsStatsCompliant(false); + } + + part.setValues(part_vals); + // If transactional table partition, check whether the current version partition + // statistics in the metastore comply with the client query's snapshot isolation. + long statsWriteId = mpart.getWriteId(); + if (TxnUtils.isTransactionalTable(table.getParameters())) { + if (!areTxnStatsSupported) { // Do not make persistent the following state since it is query specific (not global). StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); + } else if (validWriteIds != null) { + if (isCurrentStatsValidForTheQuery(part, statsWriteId, validWriteIds, false)) { + part.setIsStatsCompliant(true); + } else { + part.setIsStatsCompliant(false); + // Do not make persistent the following state since it is query specific (not global). + StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); + } } } + } finally { + rollbackAndCleanup(committed, (Query)null); } return part; } diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java index b74c3048fa..2e36d8509d 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -85,6 +85,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.BrokenBarrierException; @@ -361,6 +362,72 @@ public void testPartitionOps() throws MetaException, InvalidObjectException, } /** + * Test the concurrent drop of same partition would leak transaction. + * https://issues.apache.org/jira/browse/HIVE-16839 + * + * Note: the leak happens during a race condition, this test case tries + * to simulate the race condition on best effort, it have two threads trying + * to drop the same set of partitions + */ + @Test + public void testConcurrentDropPartitions() throws MetaException, InvalidObjectException { + Database db1 = new DatabaseBuilder() + .setName(DB1) + .setDescription("description") + .setLocation("locationurl") + .build(conf); + objectStore.createDatabase(db1); + StorageDescriptor sd = createFakeSd("location"); + HashMap tableParams = new HashMap<>(); + tableParams.put("EXTERNAL", "false"); + FieldSchema partitionKey1 = new FieldSchema("Country", ColumnType.STRING_TYPE_NAME, ""); + FieldSchema partitionKey2 = new FieldSchema("State", ColumnType.STRING_TYPE_NAME, ""); + Table tbl1 = + new Table(TABLE1, DB1, "owner", 1, 2, 3, sd, Arrays.asList(partitionKey1, partitionKey2), + tableParams, null, null, "MANAGED_TABLE"); + objectStore.createTable(tbl1); + HashMap partitionParams = new HashMap<>(); + partitionParams.put("PARTITION_LEVEL_PRIVILEGE", "true"); + + // Create some partitions + List> partNames = new LinkedList<>(); + for (char c = 'A'; c < 'Z'; c++) { + String name = "" + c; + partNames.add(Arrays.asList(name, name)); + } + for (List n : partNames) { + Partition p = new Partition(n, DB1, TABLE1, 111, 111, sd, partitionParams); + p.setCatName(DEFAULT_CATALOG_NAME); + objectStore.addPartition(p); + } + + int numThreads = 2; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executorService.execute( + () -> { + for (List p : partNames) { + try { + objectStore.dropPartition(DEFAULT_CATALOG_NAME, DB1, TABLE1, p); + System.out.println("Dropping partition: " + p.get(0)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + ); + } + + executorService.shutdown(); + try { + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS.NANOSECONDS); + } catch (InterruptedException ex) { + Assert.assertTrue("Got interrupted.", false); + } + Assert.assertTrue("Expect no active transactions.", !objectStore.isActiveTransaction()); + } + + /** * Checks if the JDO cache is able to handle directSQL partition drops in one session. * @throws MetaException * @throws InvalidObjectException -- 2.13.2