diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 397a081..5436af0 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3187,97 +3187,14 @@ public boolean equals(Object obj) { // incorrect, an exception will be thrown before the threads which create the partition // folders are submitted. This way we can be sure that no partition and no partition // folder will be created if the list contains an invalid partition. - if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { - String errorMsg = String.format( - "Partition does not belong to target table %s. It belongs to the table %s.%s : %s", - getCatalogQualifiedTableName(catName, dbName, tblName), part.getDbName(), - part.getTableName(), part.toString()); - throw new MetaException(errorMsg); - } - if (part.getValues() == null || part.getValues().isEmpty()) { - throw new MetaException("The partition values cannot be null or empty."); - } - if (part.getValues().contains(null)) { - throw new MetaException("Partition value cannot be null."); - } - - boolean shouldAdd = startAddPartition(ms, part, ifNotExists); - if (!shouldAdd) { + if (validatePartition(part, catName, tblName, dbName, partsToAdd, ms, ifNotExists)) { + partitionsToAdd.add(part); + } else { existingParts.add(part); - LOG.info("Not adding partition {} as it already exists", part); - continue; - } - - if (!partsToAdd.add(new PartValEqWrapperLite(part))) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - - partitionsToAdd.add(part); - } - - final AtomicBoolean failureOccurred = new AtomicBoolean(false); - final Table table = tbl; - List> partFutures = new ArrayList<>(partitionsToAdd.size()); - - final UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - for (final Partition partition : partitionsToAdd) { - initializePartitionParameters(table, partition); - - partFutures.add(threadPool.submit(() -> { - if (failureOccurred.get()) { - return null; - } - ugi.doAs((PrivilegedExceptionAction) () -> { - try { - boolean madeDir = createLocationForAddedPartition(table, partition); - addedPartitions.put(new PartValEqWrapperLite(partition), madeDir); - initializeAddedPartition(table, partition, madeDir); - } catch (MetaException e) { - throw new IOException(e.getMessage(), e); - } - return null; - }); - return partition; - })); - } - - String errorMessage = null; - for (Future partFuture : partFutures) { - try { - Partition part = partFuture.get(); - if (part != null && !failureOccurred.get()) { - newParts.add(part); - } - } catch (InterruptedException | ExecutionException e) { - // If an exception is thrown in the execution of a task, set the failureOccurred flag to - // true. This flag is visible in the tasks and if its value is true, the partition - // folders won't be created. - // Then iterate through the remaining tasks and wait for them to finish. The tasks which - // are started before the flag got set will then finish creating the partition folders. - // The tasks which are started after the flag got set, won't create the partition - // folders, to avoid unnecessary work. - // This way it is sure that all tasks are finished, when entering the finally part where - // the partition folders are cleaned up. It won't happen that a task is still running - // when cleaning up the folders, so it is sure we won't have leftover folders. - // Canceling the other tasks would be also an option but during testing it turned out - // that it is not a trustworthy solution to avoid leftover folders. - failureOccurred.compareAndSet(false, true); - errorMessage = e.getMessage(); } } - if (failureOccurred.get()) { - throw new MetaException(errorMessage); - } + newParts.addAll(createPartitionFolders(partitionsToAdd, tbl, addedPartitions)); if (!newParts.isEmpty()) { ms.addPartitions(catName, dbName, tblName, newParts); @@ -3296,12 +3213,7 @@ public boolean equals(Object obj) { } finally { if (!success) { ms.rollbackTransaction(); - for (Map.Entry e : addedPartitions.entrySet()) { - if (e.getValue()) { - // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); - } - } + cleanupPartitionFolders(addedPartitions); if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, @@ -3330,6 +3242,142 @@ public boolean equals(Object obj) { return newParts; } + /** + * Clean up the newly created partition folders. The values in the addedPartitions map indicates + * whether or not the location of the partition was newly created. If the value is false, the + * partition folder will not be cleaned up. + * @param addedPartitions + * @throws MetaException + * @throws IllegalArgumentException + */ + private void cleanupPartitionFolders(final Map addedPartitions) + throws MetaException, IllegalArgumentException { + for (Map.Entry e : addedPartitions.entrySet()) { + if (e.getValue()) { + // we just created this directory - it's not a case of pre-creation, so we nuke. + wh.deleteDir(new Path(e.getKey().location), true); + } + } + } + + private boolean validatePartition(final Partition part, final String catName, + final String tblName, final String dbName, final Set partsToAdd, + final RawStore ms, final boolean ifNotExists) throws MetaException, TException { + + if (part.getDbName() == null || part.getTableName() == null) { + throw new MetaException("The database and table name must be set in the partition."); + } + + if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { + String errorMsg = String.format( + "Partition does not belong to target table %s. It belongs to the table %s.%s : %s", + getCatalogQualifiedTableName(catName, dbName, tblName), part.getDbName(), + part.getTableName(), part.toString()); + throw new MetaException(errorMsg); + } + + if (part.getValues() == null || part.getValues().isEmpty()) { + throw new MetaException("The partition values cannot be null or empty."); + } + + if (part.getValues().contains(null)) { + throw new MetaException("Partition value cannot be null."); + } + + boolean shouldAdd = startAddPartition(ms, part, ifNotExists); + if (!shouldAdd) { + LOG.info("Not adding partition {} as it already exists", part); + return false; + } + + if (!partsToAdd.add(new PartValEqWrapperLite(part))) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + return true; + } + + /** + * Create the location folders for the partitions. For each partition a separate thread will be + * started to create the folder. The method will wait until all threads are finished and returns + * the partitions whose folders were created successfully. If an error occurs during the + * execution of a thread, a MetaException will be thrown. + * @param partitionsToAdd + * @param table + * @param addedPartitions + * @return + * @throws MetaException + */ + private List createPartitionFolders(final List partitionsToAdd, + final Table table, final Map addedPartitions) + throws MetaException { + + final AtomicBoolean failureOccurred = new AtomicBoolean(false); + final List> partFutures = new ArrayList<>(partitionsToAdd.size()); + + final UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + for (final Partition partition : partitionsToAdd) { + initializePartitionParameters(table, partition); + + partFutures.add(threadPool.submit(() -> { + if (failureOccurred.get()) { + return null; + } + ugi.doAs((PrivilegedExceptionAction) () -> { + try { + boolean madeDir = createLocationForAddedPartition(table, partition); + addedPartitions.put(new PartValEqWrapperLite(partition), madeDir); + initializeAddedPartition(table, partition, madeDir); + } catch (MetaException e) { + throw new IOException(e.getMessage(), e); + } + return null; + }); + return partition; + })); + } + + List newParts = new ArrayList<>(partitionsToAdd.size()); + String errorMessage = null; + for (Future partFuture : partFutures) { + try { + Partition part = partFuture.get(); + if (part != null && !failureOccurred.get()) { + newParts.add(part); + } + } catch (InterruptedException | ExecutionException e) { + // If an exception is thrown in the execution of a task, set the failureOccurred flag to + // true. This flag is visible in the tasks and if its value is true, the partition + // folders won't be created. + // Then iterate through the remaining tasks and wait for them to finish. The tasks which + // are started before the flag got set will then finish creating the partition folders. + // The tasks which are started after the flag got set, won't create the partition + // folders, to avoid unnecessary work. + // This way it is sure that all tasks are finished, when entering the finally part where + // the partition folders are cleaned up. It won't happen that a task is still running + // when cleaning up the folders, so it is sure we won't have leftover folders. + // Canceling the other tasks would be also an option but during testing it turned out + // that it is not a trustworthy solution to avoid leftover folders. + failureOccurred.compareAndSet(false, true); + errorMessage = e.getMessage(); + } + } + + if (failureOccurred.get()) { + throw new MetaException(errorMessage); + } + + return newParts; + } + @Override public AddPartitionsResult add_partitions_req(AddPartitionsRequest request) throws TException { @@ -3452,94 +3500,14 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName // folders are submitted. This way we can be sure that no partition or partition folder // will be created if the list contains an invalid partition. final Partition part = partitionIterator.getCurrent(); - - if (part.getDbName() == null || part.getTableName() == null) { - throw new MetaException("The database and table name must be set in the partition."); - } - if (!part.getTableName().equalsIgnoreCase(tblName) || !part.getDbName().equalsIgnoreCase(dbName)) { - String errorMsg = String.format( - "Partition does not belong to target table %s.%s. It belongs to the table %s.%s : %s", - dbName, tblName, part.getDbName(), part.getTableName(), part.toString()); - throw new MetaException(errorMsg); - } - if (part.getValues() == null || part.getValues().isEmpty()) { - throw new MetaException("The partition values cannot be null or empty."); - } - - boolean shouldAdd = startAddPartition(ms, part, ifNotExists); - if (!shouldAdd) { - LOG.info("Not adding partition {} as it already exists", part); - continue; + if (validatePartition(part, catName, tblName, dbName, partsToAdd, ms, ifNotExists)) { + partitionsToAdd.add(part); } - if (!partsToAdd.add(new PartValEqWrapperLite(part))) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - - partitionsToAdd.add(part); partitionIterator.next(); } - final AtomicBoolean failureOccurred = new AtomicBoolean(false); - List> partFutures = new ArrayList<>(partitionsToAdd.size()); - final Table table = tbl; - - final UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - for (final Partition partition : partitionsToAdd) { - initializePartitionParameters(table, partition); - - partFutures.add(threadPool.submit(() -> { - if (failureOccurred.get()) { - return null; - } - ugi.doAs((PrivilegedExceptionAction) () -> { - try { - boolean madeDir = createLocationForAddedPartition(table, partition); - addedPartitions.put(new PartValEqWrapperLite(partition), madeDir); - initializeAddedPartition(table, partition, madeDir); - } catch (MetaException e) { - throw new IOException(e.getMessage(), e); - } - return null; - }); - return partition; - })); - } - - String errorMessage = null; - for (Future partFuture : partFutures) { - try { - partFuture.get(); - } catch (InterruptedException | ExecutionException e) { - // If an exception is thrown in the execution of a task, set the failureOccurred flag to - // true. This flag is visible in the tasks and if its value is true, the partition - // folders won't be created. - // Then iterate through the remaining tasks and wait for them to finish. The tasks which - // are started before the flag got set will then finish creating the partition folders. - // The tasks which are started after the flag got set, won't create the partition - // folders, to avoid unnecessary work. - // This way it is sure that all tasks are finished, when entering the finally part where - // the partition folders are cleaned up. It won't happen that a task is still running - // when cleaning up the folders, so it is sure we won't have leftover folders. - // Canceling the other tasks would be also an option but during testing it turned out - // that it is not a trustworthy solution to avoid leftover folders. - failureOccurred.compareAndSet(false, true); - errorMessage = e.getMessage(); - } - } - - if (failureOccurred.get()) { - throw new MetaException(errorMessage); - } + createPartitionFolders(partitionsToAdd, tbl, addedPartitions); ms.addPartitions(catName, dbName, tblName, partitionSpecProxy, ifNotExists); @@ -3555,12 +3523,7 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName } finally { if (!success) { ms.rollbackTransaction(); - for (Map.Entry e : addedPartitions.entrySet()) { - if (e.getValue()) { - // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); - } - } + cleanupPartitionFolders(addedPartitions); } if (!listeners.isEmpty()) { diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java index 88064d9..68b5795 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java @@ -241,6 +241,33 @@ public void testAddPartitionUpperCase() throws Exception { Assert.assertTrue(metaStore.isPathExists(new Path(part.getSd().getLocation()))); } + @Test + public void testAddPartitionUpperCaseDBAndTableName() throws Exception { + + // Create table 'test_partition_db.test_add_part_table' + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + // Create partition with table name 'TEST_ADD_PART_TABLE' and db name 'TEST_PARTITION_DB' + Partition partition = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), + "2013", tableLocation + "/year=2013"); + client.add_partition(partition); + + // Validate the partition attributes + // The db and table name should be all lower case: 'test_partition_db' and + // 'test_add_part_table' + // The location should be saved case-sensitive, it should be + // warehouse dir + "TEST_ADD_PART_TABLE/year=2017" + Partition part = client.getPartition(DB_NAME, tableName, "year=2013"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2013", part.getSd().getLocation()); + Partition part1 = client.getPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "year=2013"); + Assert.assertEquals(part, part1); + } + @Test(expected = InvalidObjectException.class) public void testAddPartitionNonExistingDb() throws Exception { @@ -689,6 +716,62 @@ public void testAddPartitionsWithDefaultAttributes() throws Exception { verifyPartitionAttributesDefaultValues(part, table.getSd().getLocation()); } + @Test + public void testAddPartitionsUpperCaseDBAndTableName() throws Exception { + + // Create table 'test_partition_db.test_add_part_table' + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + // Create partitions with table name 'TEST_ADD_PART_TABLE' and db name 'TEST_PARTITION_DB' + Partition partition1 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2017", tableLocation + "/year=2017"); + Partition partition2 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2018", tableLocation + "/year=2018"); + client.add_partitions(Lists.newArrayList(partition1, partition2)); + + // Validate the partitions attributes + // The db and table name should be all lower case: 'test_partition_db' and + // 'test_add_part_table' + // The location should be saved case-sensitive, it should be + // warehouse dir + "TEST_ADD_PART_TABLE/year=2017" and + // warehouse dir + "TEST_ADD_PART_TABLE/year=2018" + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2017", part.getSd().getLocation()); + part = client.getPartition(DB_NAME, tableName, "year=2018"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2018", part.getSd().getLocation()); + } + + @Test + public void testAddPartitionsUpperCaseDBAndTableNameInOnePart() throws Exception { + + // Create table 'test_partition_db.test_add_part_table' + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + // Create two partitions with table name 'test_add_part_table' and db name 'test_partition_db' + // Create one partition with table name 'TEST_ADD_PART_TABLE' and db name 'TEST_PARTITION_DB' + Partition partition1 = buildPartition(DB_NAME, tableName, "2017", tableLocation + "/year=2017"); + Partition partition2 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2018", tableLocation + "/year=2018"); + Partition partition3 = buildPartition(DB_NAME, tableName, "2019", tableLocation + "/year=2019"); + try { + client.add_partitions(Lists.newArrayList(partition1, partition2, partition3)); + Assert.fail("MetaException should have been thrown."); + } catch (MetaException e) { + // Expected exception + } + + List partitionNames = client.listPartitionNames(DB_NAME, tableName, MAX); + Assert.assertNotNull(partitionNames); + Assert.assertTrue(partitionNames.isEmpty()); + } + @Test(expected = MetaException.class) public void testAddPartitionsNullList() throws Exception { diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java index debcd0e..2aa6ff3 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java @@ -143,6 +143,32 @@ public void testAddPartitionSpecWithSharedSD() throws Exception { } @Test + public void testAddPartitionSpecWithSharedSDUpperCaseDBAndTableName() throws Exception { + + Table table = createTable(); + PartitionWithoutSD partition = buildPartitionWithoutSD(Lists.newArrayList("2013"), 1); + List partitions = Lists.newArrayList(partition); + + String location = table.getSd().getLocation() + "/sharedSDTest/"; + PartitionSpec partitionSpec = new PartitionSpec(); + partitionSpec.setDbName(DB_NAME.toUpperCase()); + partitionSpec.setTableName(TABLE_NAME.toUpperCase()); + PartitionSpecWithSharedSD partitionList = new PartitionSpecWithSharedSD(); + partitionList.setPartitions(partitions); + partitionList.setSd(buildSD(location)); + partitionSpec.setSharedSDPartitionSpec(partitionList); + PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(partitionSpec); + client.add_partitions_pspec(partitionSpecProxy); + + Partition part = client.getPartition(DB_NAME, TABLE_NAME, "year=2013"); + Assert.assertNotNull(part); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(TABLE_NAME, part.getTableName()); + Assert.assertEquals(metaStore.getWarehouseRoot() + "/" + TABLE_NAME + + "/sharedSDTest/partwithoutsd1", part.getSd().getLocation()); + } + + @Test public void testAddPartitionSpecsMultipleValues() throws Exception { Table table = createTable(DB_NAME, TABLE_NAME, getYearAndMonthPartCols(), @@ -166,6 +192,75 @@ public void testAddPartitionSpecsMultipleValues() throws Exception { verifyPartitionSharedSD(table, "year=2005/month=may", Lists.newArrayList("2005", "may"), 4); } + @Test + public void testAddPartitionSpecUpperCaseDBAndTableName() throws Exception { + + // Create table 'test_partition_db.test_add_part_table' + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + // Create partitions with table name 'TEST_ADD_PART_TABLE' and db name 'TEST_PARTITION_DB' + Partition partition1 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2013", + tableLocation + "/year=2013"); + Partition partition2 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2014", + tableLocation + "/year=2014"); + List partitions = Lists.newArrayList(partition1, partition2); + + String rootPath = tableLocation + "/addpartspectest/"; + PartitionSpecProxy partitionSpec = + buildPartitionSpec(DB_NAME.toUpperCase(), tableName.toUpperCase(), rootPath, partitions); + client.add_partitions_pspec(partitionSpec); + + // Validate the partition attributes + // The db and table name should be all lower case: 'test_partition_db' and + // 'test_add_part_table' + // The location should be saved case-sensitive, it should be + // warehouse dir + "TEST_ADD_PART_TABLE/year=2017" + Partition part = client.getPartition(DB_NAME, tableName, "year=2013"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2013", part.getSd().getLocation()); + Partition part1 = client.getPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "year=2013"); + Assert.assertEquals(part, part1); + part = client.getPartition(DB_NAME, tableName, "year=2014"); + Assert.assertNotNull(part); + Assert.assertEquals(tableName, part.getTableName()); + Assert.assertEquals(DB_NAME, part.getDbName()); + Assert.assertEquals(tableLocation + "/year=2014", part.getSd().getLocation()); + part1 = client.getPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "year=2014"); + Assert.assertEquals(part, part1); + } + + @Test + public void testAddPartitionSpecUpperCaseDBAndTableNameInOnePart() throws Exception { + + String tableName = "test_add_part_table"; + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName.toUpperCase(); + createTable(DB_NAME, tableName, getYearPartCol(), tableLocation); + + Partition partition1 = buildPartition(DB_NAME, tableName, "2013", + tableLocation + "/year=2013"); + Partition partition2 = buildPartition(DB_NAME.toUpperCase(), tableName.toUpperCase(), "2014", + tableLocation + "/year=2014"); + List partitions = Lists.newArrayList(partition1, partition2); + + String rootPath = tableLocation + "/addpartspectest/"; + PartitionSpecProxy partitionSpec = buildPartitionSpec(DB_NAME, tableName, rootPath, partitions); + + try { + client.add_partitions_pspec(partitionSpec); + Assert.fail("MetaException should have been thrown."); + } catch (MetaException e) { + // Expected exception + } + + List partitionNames = client.listPartitionNames(DB_NAME, tableName, MAX); + Assert.assertNotNull(partitionNames); + Assert.assertTrue(partitionNames.isEmpty()); + } + // TODO add tests for partitions in other catalogs @Test(expected = MetaException.class)