diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ae9ec5c..133249d 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3243,91 +3243,16 @@ public boolean equals(Object obj) { // incorrect, an exception will be thrown before the threads which create the partition // folders are submitted. This way we can be sure that no partition and no partition // folder will be created if the list contains an invalid partition. - if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { - String errorMsg = String.format( - "Partition does not belong to target table %s. It belongs to the table %s.%s : %s", - getCatalogQualifiedTableName(catName, dbName, tblName), part.getDbName(), - part.getTableName(), part.toString()); - throw new MetaException(errorMsg); - } - - boolean shouldAdd = startAddPartition(ms, part, ifNotExists); - if (!shouldAdd) { + boolean validPartition = validatePartition(part, catName, tblName, dbName, + partsToAdd, ms, ifNotExists); + if (validPartition) { + partitionsToAdd.add(part); + } else { existingParts.add(part); - LOG.info("Not adding partition {} as it already exists", part); - continue; - } - - if (!partsToAdd.add(new PartValEqWrapperLite(part))) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - - partitionsToAdd.add(part); - } - - final AtomicBoolean failureOccurred = new AtomicBoolean(false); - final Table table = tbl; - List> partFutures = new ArrayList<>(partitionsToAdd.size()); - - final UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - for (final Partition partition : partitionsToAdd) { - initializePartitionParameters(table, partition); - - partFutures.add(threadPool.submit(() -> { - if (failureOccurred.get()) { - return null; - } - ugi.doAs((PrivilegedExceptionAction) () -> { - try { - boolean madeDir = createLocationForAddedPartition(table, partition); - addedPartitions.put(new PartValEqWrapperLite(partition), madeDir); - initializeAddedPartition(table, partition, madeDir); - } catch (MetaException e) { - throw new IOException(e.getMessage(), e); - } - return null; - }); - return partition; - })); - } - - String errorMessage = null; - for (Future partFuture : partFutures) { - try { - Partition part = partFuture.get(); - if (part != null && !failureOccurred.get()) { - newParts.add(part); - } - } catch (InterruptedException | ExecutionException e) { - // If an exception is thrown in the execution of a task, set the failureOccurred flag to - // true. This flag is visible in the tasks and if its value is true, the partition - // folders won't be created. - // Then iterate through the remaining tasks and wait for them to finish. The tasks which - // are started before the flag got set will then finish creating the partition folders. - // The tasks which are started after the flag got set, won't create the partition - // folders, to avoid unnecessary work. - // This way it is sure that all tasks are finished, when entering the finally part where - // the partition folders are cleaned up. It won't happen that a task is still running - // when cleaning up the folders, so it is sure we won't have leftover folders. - // Canceling the other tasks would be also an option but during testing it turned out - // that it is not a trustworthy solution to avoid leftover folders. - failureOccurred.compareAndSet(false, true); - errorMessage = e.getMessage(); } } - if (failureOccurred.get()) { - throw new MetaException(errorMessage); - } + newParts.addAll(createPartitionFolders(partitionsToAdd, tbl, addedPartitions)); if (!newParts.isEmpty()) { ms.addPartitions(catName, dbName, tblName, newParts); @@ -3346,12 +3271,7 @@ public boolean equals(Object obj) { } finally { if (!success) { ms.rollbackTransaction(); - for (Map.Entry e : addedPartitions.entrySet()) { - if (e.getValue()) { - // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); - } - } + cleanupPartitionFolders(addedPartitions); if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, @@ -3380,6 +3300,111 @@ public boolean equals(Object obj) { return newParts; } + private void cleanupPartitionFolders(final Map addedPartitions) + throws MetaException, IllegalArgumentException { + for (Map.Entry e : addedPartitions.entrySet()) { + if (e.getValue()) { + // we just created this directory - it's not a case of pre-creation, so we nuke. + wh.deleteDir(new Path(e.getKey().location), true); + } + } + } + + private boolean validatePartition(final Partition part, final String catName, + final String tblName, final String dbName, final Set partsToAdd, + final RawStore ms, final boolean ifNotExists) throws MetaException, TException { + + if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { + String errorMsg = String.format( + "Partition does not belong to target table %s. It belongs to the table %s.%s : %s", + getCatalogQualifiedTableName(catName, dbName, tblName), part.getDbName(), + part.getTableName(), part.toString()); + throw new MetaException(errorMsg); + } + + boolean shouldAdd = startAddPartition(ms, part, ifNotExists); + if (!shouldAdd) { + LOG.info("Not adding partition {} as it already exists", part); + return false; + } + + if (!partsToAdd.add(new PartValEqWrapperLite(part))) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + return true; + } + + private List createPartitionFolders(final List partitionsToAdd, + final Table table, final Map addedPartitions) + throws MetaException { + + final AtomicBoolean failureOccurred = new AtomicBoolean(false); + final List> partFutures = new ArrayList<>(partitionsToAdd.size()); + + final UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + for (final Partition partition : partitionsToAdd) { + initializePartitionParameters(table, partition); + + partFutures.add(threadPool.submit(() -> { + if (failureOccurred.get()) { + return null; + } + ugi.doAs((PrivilegedExceptionAction) () -> { + try { + boolean madeDir = createLocationForAddedPartition(table, partition); + addedPartitions.put(new PartValEqWrapperLite(partition), madeDir); + initializeAddedPartition(table, partition, madeDir); + } catch (MetaException e) { + throw new IOException(e.getMessage(), e); + } + return null; + }); + return partition; + })); + } + + List newParts = new ArrayList<>(partitionsToAdd.size()); + String errorMessage = null; + for (Future partFuture : partFutures) { + try { + Partition part = partFuture.get(); + if (part != null && !failureOccurred.get()) { + newParts.add(part); + } + } catch (InterruptedException | ExecutionException e) { + // If an exception is thrown in the execution of a task, set the failureOccurred flag to + // true. This flag is visible in the tasks and if its value is true, the partition + // folders won't be created. + // Then iterate through the remaining tasks and wait for them to finish. The tasks which + // are started before the flag got set will then finish creating the partition folders. + // The tasks which are started after the flag got set, won't create the partition + // folders, to avoid unnecessary work. + // This way it is sure that all tasks are finished, when entering the finally part where + // the partition folders are cleaned up. It won't happen that a task is still running + // when cleaning up the folders, so it is sure we won't have leftover folders. + // Canceling the other tasks would be also an option but during testing it turned out + // that it is not a trustworthy solution to avoid leftover folders. + failureOccurred.compareAndSet(false, true); + errorMessage = e.getMessage(); + } + } + + if (failureOccurred.get()) { + throw new MetaException(errorMessage); + } + + return newParts; + } + @Override public AddPartitionsResult add_partitions_req(AddPartitionsRequest request) throws TException { @@ -3495,88 +3520,16 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName // folders are submitted. This way we can be sure that no partition or partition folder // will be created if the list contains an invalid partition. final Partition part = partitionIterator.getCurrent(); - - if (!part.getTableName().equalsIgnoreCase(tblName) || !part.getDbName().equalsIgnoreCase(dbName)) { - String errorMsg = String.format( - "Partition does not belong to target table %s.%s. It belongs to the table %s.%s : %s", - dbName, tblName, part.getDbName(), part.getTableName(), part.toString()); - throw new MetaException(errorMsg); - } - - boolean shouldAdd = startAddPartition(ms, part, ifNotExists); - if (!shouldAdd) { - LOG.info("Not adding partition {} as it already exists", part); - continue; + boolean validPartition = validatePartition(part, catName, tblName, dbName, + partsToAdd, ms, ifNotExists); + if (validPartition) { + partitionsToAdd.add(part); } - if (!partsToAdd.add(new PartValEqWrapperLite(part))) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - - partitionsToAdd.add(part); partitionIterator.next(); } - final AtomicBoolean failureOccurred = new AtomicBoolean(false); - List> partFutures = new ArrayList<>(partitionsToAdd.size()); - final Table table = tbl; - - final UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - for (final Partition partition : partitionsToAdd) { - initializePartitionParameters(table, partition); - - partFutures.add(threadPool.submit(() -> { - if (failureOccurred.get()) { - return null; - } - ugi.doAs((PrivilegedExceptionAction) () -> { - try { - boolean madeDir = createLocationForAddedPartition(table, partition); - addedPartitions.put(new PartValEqWrapperLite(partition), madeDir); - initializeAddedPartition(table, partition, madeDir); - } catch (MetaException e) { - throw new IOException(e.getMessage(), e); - } - return null; - }); - return partition; - })); - } - - String errorMessage = null; - for (Future partFuture : partFutures) { - try { - partFuture.get(); - } catch (InterruptedException | ExecutionException e) { - // If an exception is thrown in the execution of a task, set the failureOccurred flag to - // true. This flag is visible in the tasks and if its value is true, the partition - // folders won't be created. - // Then iterate through the remaining tasks and wait for them to finish. The tasks which - // are started before the flag got set will then finish creating the partition folders. - // The tasks which are started after the flag got set, won't create the partition - // folders, to avoid unnecessary work. - // This way it is sure that all tasks are finished, when entering the finally part where - // the partition folders are cleaned up. It won't happen that a task is still running - // when cleaning up the folders, so it is sure we won't have leftover folders. - // Canceling the other tasks would be also an option but during testing it turned out - // that it is not a trustworthy solution to avoid leftover folders. - failureOccurred.compareAndSet(false, true); - errorMessage = e.getMessage(); - } - } - - if (failureOccurred.get()) { - throw new MetaException(errorMessage); - } + createPartitionFolders(partitionsToAdd, tbl, addedPartitions); ms.addPartitions(catName, dbName, tblName, partitionSpecProxy, ifNotExists); @@ -3592,12 +3545,7 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName } finally { if (!success) { ms.rollbackTransaction(); - for (Map.Entry e : addedPartitions.entrySet()) { - if (e.getValue()) { - // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); - } - } + cleanupPartitionFolders(addedPartitions); } if (!listeners.isEmpty()) { diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java index f8497c7..09e349d 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java @@ -242,6 +242,33 @@ public void testAddPartitionUpperCase() throws Exception { Assert.assertTrue(metaStore.isPathExists(new Path(part.getSd().getLocation()))); } + @Test + public void testAddPartitionUpperCaseDBAndTableName() throws Exception { + + // Create table 'test_partition_db.test_add_part_table' + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + // Create partition with table name 'TEST_ADD_PART_TABLE' and db name 'TEST_PARTITION_DB' + Partition partition = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), + "2013", tableLocation + "/year=2013"); + client.add_partition(partition); + + // Validate the partition attributes + // The db and table name should be all lower case: 'test_partition_db' and + // 'test_add_part_table' + // The location should be saved case-sensitive, it should be + // warehouse dir + "TEST_ADD_PART_TABLE/year=2017" + Partition part = client.getPartition(DB_NAME, tableName, "year=2013"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2013", part.getSd().getLocation()); + Partition part1 = client.getPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "year=2013"); + Assert.assertEquals(part, part1); + } + @Test(expected = InvalidObjectException.class) public void testAddPartitionNonExistingDb() throws Exception { @@ -512,7 +539,7 @@ public void testAddPartitionTooManyValues() throws Exception { @Test(expected = MetaException.class) public void testAddPartitionNoPartColOnTable() throws Exception { - Table origTable = new TableBuilder() + new TableBuilder() .setDbName(DB_NAME) .setTableName(TABLE_NAME) .addCol("test_id", "int", "test col id") @@ -699,6 +726,62 @@ public void testAddPartitionsWithDefaultAttributes() throws Exception { } @Test + public void testAddPartitionsUpperCaseDBAndTableName() throws Exception { + + // Create table 'test_partition_db.test_add_part_table' + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + // Create partitions with table name 'TEST_ADD_PART_TABLE' and db name 'TEST_PARTITION_DB' + Partition partition1 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2017", tableLocation + "/year=2017"); + Partition partition2 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2018", tableLocation + "/year=2018"); + client.add_partitions(Lists.newArrayList(partition1, partition2)); + + // Validate the partitions attributes + // The db and table name should be all lower case: 'test_partition_db' and + // 'test_add_part_table' + // The location should be saved case-sensitive, it should be + // warehouse dir + "TEST_ADD_PART_TABLE/year=2017" and + // warehouse dir + "TEST_ADD_PART_TABLE/year=2018" + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2017", part.getSd().getLocation()); + part = client.getPartition(DB_NAME, tableName, "year=2018"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2018", part.getSd().getLocation()); + } + + @Test + public void testAddPartitionsUpperCaseDBAndTableNameInOnePart() throws Exception { + + // Create table 'test_partition_db.test_add_part_table' + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + // Create two partitions with table name 'test_add_part_table' and db name 'test_partition_db' + // Create one partition with table name 'TEST_ADD_PART_TABLE' and db name 'TEST_PARTITION_DB' + Partition partition1 = buildPartition(DB_NAME, tableName, "2017", tableLocation + "/year=2017"); + Partition partition2 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2018", tableLocation + "/year=2018"); + Partition partition3 = buildPartition(DB_NAME, tableName, "2019", tableLocation + "/year=2019"); + try { + client.add_partitions(Lists.newArrayList(partition1, partition2, partition3)); + Assert.fail("MetaException should have been thrown."); + } catch (MetaException e) { + // Expected exception + } + + List partitionNames = client.listPartitionNames(DB_NAME, tableName, MAX); + Assert.assertNotNull(partitionNames); + Assert.assertTrue(partitionNames.isEmpty()); + } + + @Test public void testAddPartitionsNullList() throws Exception { try { client.add_partitions(null); diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java index fc0c60f..fc95908 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java @@ -145,6 +145,32 @@ public void testAddPartitionSpecWithSharedSD() throws Exception { } @Test + public void testAddPartitionSpecWithSharedSDUpperCaseDBAndTableName() throws Exception { + + Table table = createTable(); + PartitionWithoutSD partition = buildPartitionWithoutSD(Lists.newArrayList("2013"), 1); + List partitions = Lists.newArrayList(partition); + + String location = table.getSd().getLocation() + "/sharedSDTest/"; + PartitionSpec partitionSpec = new PartitionSpec(); + partitionSpec.setDbName(DB_NAME.toUpperCase()); + partitionSpec.setTableName(TABLE_NAME.toUpperCase()); + PartitionSpecWithSharedSD partitionList = new PartitionSpecWithSharedSD(); + partitionList.setPartitions(partitions); + partitionList.setSd(buildSD(location)); + partitionSpec.setSharedSDPartitionSpec(partitionList); + PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(partitionSpec); + client.add_partitions_pspec(partitionSpecProxy); + + Partition part = client.getPartition(DB_NAME, TABLE_NAME, "year=2013"); + Assert.assertNotNull(part); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(TABLE_NAME, part.getTableName()); + Assert.assertEquals(metaStore.getWarehouseRoot() + "/" + TABLE_NAME + + "/sharedSDTest/partwithoutsd1", part.getSd().getLocation()); + } + + @Test public void testAddPartitionSpecsMultipleValues() throws Exception { Table table = createTable(DB_NAME, TABLE_NAME, getYearAndMonthPartCols(), @@ -168,6 +194,75 @@ public void testAddPartitionSpecsMultipleValues() throws Exception { verifyPartitionSharedSD(table, "year=2005/month=may", Lists.newArrayList("2005", "may"), 4); } + @Test + public void testAddPartitionSpecUpperCaseDBAndTableName() throws Exception { + + // Create table 'test_partition_db.test_add_part_table' + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + // Create partitions with table name 'TEST_ADD_PART_TABLE' and db name 'TEST_PARTITION_DB' + Partition partition1 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2013", + tableLocation + "/year=2013"); + Partition partition2 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2014", + tableLocation + "/year=2014"); + List partitions = Lists.newArrayList(partition1, partition2); + + String rootPath = tableLocation + "/addpartspectest/"; + PartitionSpecProxy partitionSpec = + buildPartitionSpec(DB_NAME.toUpperCase(), tableName.toUpperCase(), rootPath, partitions); + client.add_partitions_pspec(partitionSpec); + + // Validate the partition attributes + // The db and table name should be all lower case: 'test_partition_db' and + // 'test_add_part_table' + // The location should be saved case-sensitive, it should be + // warehouse dir + "TEST_ADD_PART_TABLE/year=2017" + Partition part = client.getPartition(DB_NAME, tableName, "year=2013"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2013", part.getSd().getLocation()); + Partition part1 = client.getPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "year=2013"); + Assert.assertEquals(part, part1); + part = client.getPartition(DB_NAME, tableName, "year=2014"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2014", part.getSd().getLocation()); + part1 = client.getPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "year=2014"); + Assert.assertEquals(part, part1); + } + + @Test + public void testAddPartitionSpecUpperCaseDBAndTableNameInOnePart() throws Exception { + + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + Partition partition1 = buildPartition(DB_NAME, tableName, "2013", + tableLocation + "/year=2013"); + Partition partition2 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2014", + tableLocation + "/year=2014"); + List partitions = Lists.newArrayList(partition1, partition2); + + String rootPath = tableLocation + "/addpartspectest/"; + PartitionSpecProxy partitionSpec = buildPartitionSpec(DB_NAME, tableName, rootPath, partitions); + + try { + client.add_partitions_pspec(partitionSpec); + Assert.fail("MetaException should have been thrown."); + } catch (MetaException e) { + // Expected exception + } + + List partitionNames = client.listPartitionNames(DB_NAME, tableName, MAX); + Assert.assertNotNull(partitionNames); + Assert.assertTrue(partitionNames.isEmpty()); + } + // TODO add tests for partitions in other catalogs @Test(expected = NullPointerException.class)