diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 8539fea..b0504da 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3114,54 +3114,15 @@ public Partition append_partition_with_environment_context(final String dbName, return ret; } - private static class PartValEqWrapper { - Partition partition; - - PartValEqWrapper(Partition partition) { - this.partition = partition; - } - - @Override - public int hashCode() { - return partition.isSetValues() ? partition.getValues().hashCode() : 0; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || !(obj instanceof PartValEqWrapper)) { - return false; - } - Partition p1 = this.partition, p2 = ((PartValEqWrapper)obj).partition; - if (!p1.isSetValues() || !p2.isSetValues()) { - return p1.isSetValues() == p2.isSetValues(); - } - if (p1.getValues().size() != p2.getValues().size()) { - return false; - } - for (int i = 0; i < p1.getValues().size(); ++i) { - String v1 = p1.getValues().get(i); - String v2 = p2.getValues().get(i); - if (v1 == null && v2 == null) { - continue; - } - if (v1 == null || !v1.equals(v2)) { - return false; - } - } - return true; - } - } - private static class PartValEqWrapperLite { List values; String location; PartValEqWrapperLite(Partition partition) { this.values = partition.isSetValues()? partition.getValues() : null; - this.location = partition.getSd().getLocation(); + if (partition.getSd() != null) { + this.location = partition.getSd().getLocation(); + } } @Override @@ -3209,7 +3170,7 @@ public boolean equals(Object obj) { logInfo("add_partitions"); boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. - final Map addedPartitions = new ConcurrentHashMap<>(); + final Map addedPartitions = new ConcurrentHashMap<>(); final List newParts = new ArrayList<>(); final List existingParts = new ArrayList<>(); Table tbl = null; @@ -3228,78 +3189,103 @@ public boolean equals(Object obj) { firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); } - List> partFutures = Lists.newArrayList(); - final Table table = tbl; + Set partsToAdd = new HashSet<>(parts.size()); + List partitionsToAdd = new ArrayList<>(parts.size()); for (final Partition part : parts) { + // Iterate through the partitions and validate them. If one of the partitions is + // incorrect, an exception will be thrown before the threads which create the partition + // folders are submitted. This way we can be sure that no partition and no partition + // folder will be created if the list contains an invalid partition. if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { - throw new MetaException("Partition does not belong to target table " + - getCatalogQualifiedTableName(catName, dbName, tblName) + ": " + - part); + String errorMsg = String.format( + "Partition does not belong to target table %s. It belongs to the table %s.%s : %s", + getCatalogQualifiedTableName(catName, dbName, tblName), part.getDbName(), + part.getTableName(), part.toString()); + throw new MetaException(errorMsg); } boolean shouldAdd = startAddPartition(ms, part, ifNotExists); if (!shouldAdd) { existingParts.add(part); - LOG.info("Not adding partition " + part + " as it already exists"); + LOG.info("Not adding partition {} as it already exists", part); continue; } - final UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - partFutures.add(threadPool.submit(new Callable() { - @Override - public Partition call() throws Exception { - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - try { - boolean madeDir = createLocationForAddedPartition(table, part); - if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - initializeAddedPartition(table, part, madeDir); - } catch (MetaException e) { - throw new IOException(e.getMessage(), e); - } - return null; - } - }); - return part; + if (!partsToAdd.add(new PartValEqWrapperLite(part))) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + + partitionsToAdd.add(part); + } + + final AtomicBoolean failureOccurred = new AtomicBoolean(false); + final Table table = tbl; + List> partFutures = new ArrayList<>(partitionsToAdd.size()); + + final UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + for (final Partition partition : partitionsToAdd) { + initializePartitionParameters(table, partition); + + partFutures.add(threadPool.submit(() -> { + if (failureOccurred.get()) { + return null; } + ugi.doAs((PrivilegedExceptionAction) () -> { + try { + boolean madeDir = createLocationForAddedPartition(table, partition); + addedPartitions.put(new PartValEqWrapperLite(partition), madeDir); + initializeAddedPartition(table, partition, madeDir); + } catch (MetaException e) { + throw new IOException(e.getMessage(), e); + } + return null; + }); + return partition; })); } - try { - for (Future partFuture : partFutures) { + String errorMessage = null; + for (Future partFuture : partFutures) { + try { Partition part = partFuture.get(); - if (part != null) { + if (part != null && !failureOccurred.get()) { newParts.add(part); } + } catch (InterruptedException | ExecutionException e) { + // If an exception is thrown in the execution of a task, set the failureOccurred flag to + // true. This flag is visible in the tasks and if its value is true, the partition + // folders won't be created. + // Then iterate through the remaining tasks and wait for them to finish. The tasks which + // are started before the flag got set will then finish creating the partition folders. + // The tasks which are started after the flag got set, won't create the partition + // folders, to avoid unnecessary work. + // This way it is sure that all tasks are finished, when entering the finally part where + // the partition folders are cleaned up. It won't happen that a task is still running + // when cleaning up the folders, so it is sure we won't have leftover folders. + // Canceling the other tasks would be also an option but during testing it turned out + // that it is not a trustworthy solution to avoid leftover folders. + failureOccurred.compareAndSet(false, true); + errorMessage = e.getMessage(); } - } catch (InterruptedException | ExecutionException e) { - // cancel other tasks - for (Future partFuture : partFutures) { - partFuture.cancel(true); - } - throw new MetaException(e.getMessage()); + } + + if (failureOccurred.get()) { + throw new MetaException(errorMessage); } if (!newParts.isEmpty()) { - success = ms.addPartitions(catName, dbName, tblName, newParts); - } else { - success = true; + ms.addPartitions(catName, dbName, tblName, newParts); } - // Setting success to false to make sure that if the listener fails, rollback happens. - success = false; // Notification is generated for newly created partitions only. The subset of partitions // that already exist (existingParts), will not generate notifications. if (!transactionalListeners.isEmpty()) { @@ -3313,10 +3299,10 @@ public Object run() throws Exception { } finally { if (!success) { ms.rollbackTransaction(); - for (Map.Entry e : addedPartitions.entrySet()) { + for (Map.Entry e : addedPartitions.entrySet()) { if (e.getValue()) { // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true); + wh.deleteDir(new Path(e.getKey().location), true); } } @@ -3454,70 +3440,98 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName } firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); - List> partFutures = Lists.newArrayList(); - final Table table = tbl; - while(partitionIterator.hasNext()) { + Set partsToAdd = new HashSet<>(partitionSpecProxy.size()); + List partitionsToAdd = new ArrayList<>(partitionSpecProxy.size()); + while (partitionIterator.hasNext()) { + // Iterate through the partitions and validate them. If one of the partitions is + // incorrect, an exception will be thrown before the threads which create the partition + // folders are submitted. This way we can be sure that no partition or partition folder + // will be created if the list contains an invalid partition. final Partition part = partitionIterator.getCurrent(); if (!part.getTableName().equalsIgnoreCase(tblName) || !part.getDbName().equalsIgnoreCase(dbName)) { - throw new MetaException("Partition does not belong to target table " - + dbName + "." + tblName + ": " + part); + String errorMsg = String.format( + "Partition does not belong to target table %s.%s. It belongs to the table %s.%s : %s", + dbName, tblName, part.getDbName(), part.getTableName(), part.toString()); + throw new MetaException(errorMsg); } boolean shouldAdd = startAddPartition(ms, part, ifNotExists); if (!shouldAdd) { - LOG.info("Not adding partition " + part + " as it already exists"); + LOG.info("Not adding partition {} as it already exists", part); continue; } - final UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - partFutures.add(threadPool.submit(new Callable() { - @Override public Partition call() throws Exception { - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Partition run() throws Exception { - try { - boolean madeDir = createLocationForAddedPartition(table, part); - if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - initializeAddedPartition(table, part, madeDir); - } catch (MetaException e) { - throw new IOException(e.getMessage(), e); - } - return null; - } - }); - return part; - } - })); + if (!partsToAdd.add(new PartValEqWrapperLite(part))) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + + partitionsToAdd.add(part); partitionIterator.next(); } + final AtomicBoolean failureOccurred = new AtomicBoolean(false); + List> partFutures = new ArrayList<>(partitionsToAdd.size()); + final Table table = tbl; + + final UserGroupInformation ugi; try { - for (Future partFuture : partFutures) { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + for (final Partition partition : partitionsToAdd) { + initializePartitionParameters(table, partition); + + partFutures.add(threadPool.submit(() -> { + if (failureOccurred.get()) { + return null; + } + ugi.doAs((PrivilegedExceptionAction) () -> { + try { + boolean madeDir = createLocationForAddedPartition(table, partition); + addedPartitions.put(new PartValEqWrapperLite(partition), madeDir); + initializeAddedPartition(table, partition, madeDir); + } catch (MetaException e) { + throw new IOException(e.getMessage(), e); + } + return null; + }); + return partition; + })); + } + + String errorMessage = null; + for (Future partFuture : partFutures) { + try { partFuture.get(); + } catch (InterruptedException | ExecutionException e) { + // If an exception is thrown in the execution of a task, set the failureOccurred flag to + // true. This flag is visible in the tasks and if its value is true, the partition + // folders won't be created. + // Then iterate through the remaining tasks and wait for them to finish. The tasks which + // are started before the flag got set will then finish creating the partition folders. + // The tasks which are started after the flag got set, won't create the partition + // folders, to avoid unnecessary work. + // This way it is sure that all tasks are finished, when entering the finally part where + // the partition folders are cleaned up. It won't happen that a task is still running + // when cleaning up the folders, so it is sure we won't have leftover folders. + // Canceling the other tasks would be also an option but during testing it turned out + // that it is not a trustworthy solution to avoid leftover folders. + failureOccurred.compareAndSet(false, true); + errorMessage = e.getMessage(); } - } catch (InterruptedException | ExecutionException e) { - // cancel other tasks - for (Future partFuture : partFutures) { - partFuture.cancel(true); - } - throw new MetaException(e.getMessage()); } - success = ms.addPartitions(catName, dbName, tblName, partitionSpecProxy, ifNotExists); - //setting success to false to make sure that if the listener fails, rollback happens. - success = false; + if (failureOccurred.get()) { + throw new MetaException(errorMessage); + } + + ms.addPartitions(catName, dbName, tblName, partitionSpecProxy, ifNotExists); if (!transactionalListeners.isEmpty()) { transactionalListenerResponses = @@ -3626,6 +3640,16 @@ private void initializeAddedPartition( part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) { part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); } + } + + private void initializePartitionParameters(final Table tbl, final Partition part) + throws MetaException { + initializePartitionParameters(tbl, + new PartitionSpecProxy.SimplePartitionWrapperIterator(part)); + } + + private void initializePartitionParameters(final Table tbl, + final PartitionSpecProxy.PartitionIterator part) throws MetaException { // Inherit table properties into partition properties. Map tblParams = tbl.getParameters(); @@ -3667,6 +3691,7 @@ private Partition add_partition_core(final RawStore ms, boolean madeDir = createLocationForAddedPartition(tbl, part); try { initializeAddedPartition(tbl, part, madeDir); + initializePartitionParameters(tbl, part); success = ms.addPartition(part); } finally { if (!success && madeDir) { diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java index 8555eee..f8497c7 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -434,20 +436,39 @@ public void testAddPartitionNullLocationInTableToo() throws Exception { @Test(expected = MetaException.class) public void testAddPartitionForView() throws Exception { - Table table = new TableBuilder() - .setDbName(DB_NAME) - .setTableName(TABLE_NAME) - .setType("VIRTUAL_VIEW") - .addCol("test_id", "int", "test col id") - .addCol("test_value", DEFAULT_COL_TYPE, "test col value") - .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) - .setLocation(null) - .create(client, metaStore.getConf()); - Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); client.add_partition(partition); } @Test + public void testAddPartitionsForViewNullPartLocation() throws Exception { + + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + partition.getSd().setLocation(null); + List partitions = Lists.newArrayList(partition); + client.add_partitions(partitions); + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNull(part.getSd().getLocation()); + } + + @Test + public void testAddPartitionsForViewNullPartSd() throws Exception { + + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + partition.setSd(null); + List partitions = Lists.newArrayList(partition); + client.add_partitions(partitions); + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNull(part.getSd()); + } + + @Test public void testAddPartitionForExternalTable() throws Exception { String tableName = "part_add_ext_table"; @@ -736,20 +757,26 @@ public void testAddPartitionsDifferentDBs() throws Exception { client.dropDatabase("parttestdb2", true, true, true); } - @Test(expected = MetaException.class) + @Test public void testAddPartitionsDuplicateInTheList() throws Exception { createTable(); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, + Lists.newArrayList("2014", "2015", "2017", "2017", "2018", "2019")); - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2017"); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2016"); - Partition partition3 = buildPartition(DB_NAME, TABLE_NAME, "2017"); + try { + client.add_partitions(partitions); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + // Expected exception + } - List partitions = new ArrayList<>(); - partitions.add(partition1); - partitions.add(partition2); - partitions.add(partition3); - client.add_partitions(partitions); + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } } @Test @@ -774,22 +801,32 @@ public void testAddPartitionsWithSameNameInTheListCaseSensitive() throws Excepti Assert.assertTrue(parts.contains("year=THIS")); } - @Test(expected = AlreadyExistsException.class) + @Test public void testAddPartitionsAlreadyExists() throws Exception { createTable(); - Partition partition = buildPartition(DB_NAME, TABLE_NAME, "2017"); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Partition partition = + buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016a"); client.add_partition(partition); - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2015"); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017"); - Partition partition3 = buildPartition(DB_NAME, TABLE_NAME, "2016"); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, + Lists.newArrayList("2014", "2015", "2016", "2017", "2018")); - List partitions = new ArrayList<>(); - partitions.add(partition1); - partitions.add(partition2); - partitions.add(partition3); - client.add_partitions(partitions); + try { + client.add_partitions(partitions); + Assert.fail("AlreadyExistsException should have happened."); + } catch (AlreadyExistsException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertEquals(1, parts.size()); + Assert.assertEquals(partition.getValues(), parts.get(0).getValues()); + for (Partition part : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation()))); + } } @Test(expected = MetaException.class) @@ -877,16 +914,24 @@ public void testAddPartitionsOneInvalid() throws Exception { createTable(); String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016"); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017"); + Partition partition1 = + buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016"); + Partition partition2 = + buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017"); Partition partition3 = buildPartition(Lists.newArrayList("2015", "march"), getYearAndMonthPartCols(), 1); partition3.getSd().setLocation(tableLocation + "/year=2015/month=march"); + Partition partition4 = + buildPartition(DB_NAME, TABLE_NAME, "2018", tableLocation + "/year=2018"); + Partition partition5 = + buildPartition(DB_NAME, TABLE_NAME, "2019", tableLocation + "/year=2019"); List partitions = new ArrayList<>(); partitions.add(partition1); partitions.add(partition2); partitions.add(partition3); + partitions.add(partition4); + partitions.add(partition5); try { client.add_partitions(partitions); @@ -898,15 +943,9 @@ public void testAddPartitionsOneInvalid() throws Exception { List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); Assert.assertNotNull(parts); Assert.assertTrue(parts.isEmpty()); - // TODO: This does not work correctly. None of the partitions is created, but the folder - // for the first two is created. It is because in HiveMetaStore.add_partitions_core when - // going through the partitions, the first two are already put and started in the thread - // pool when the exception occurs in the third one. When the exception occurs, we go to - // the finally part, but the map can be empty (it depends on the progress of the other - // threads) so the folders won't be deleted. -// Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2016"))); -// Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2017"))); - Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2015/month=march"))); + for (Partition part : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation()))); + } } @Test(expected = MetaException.class) @@ -1036,16 +1075,9 @@ public void testAddPartitionsNullLocationInTableToo() throws Exception { @Test(expected=MetaException.class) public void testAddPartitionsForView() throws Exception { - Table table = new TableBuilder() - .setDbName(DB_NAME) - .setTableName(TABLE_NAME) - .setType("VIRTUAL_VIEW") - .addCol("test_id", "int", "test col id") - .addCol("test_value", "string", "test col value") - .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) - .setLocation(null) - .create(client, metaStore.getConf()); - Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); List partitions = Lists.newArrayList(partition); client.add_partitions(partitions); } @@ -1169,6 +1201,70 @@ public void testAddPartitionsEmptyValue() throws Exception { Assert.assertEquals("year=__HIVE_DEFAULT_PARTITION__", partitionNames.get(0)); } + @Test + public void testAddPartitionsInvalidLocation() throws Exception { + + createTable(); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Map valuesAndLocations = new HashMap<>(); + valuesAndLocations.put("2014", tableLocation + "/year=2014"); + valuesAndLocations.put("2015", tableLocation + "/year=2015"); + valuesAndLocations.put("2016", "invalidhost:80000/wrongfolder"); + valuesAndLocations.put("2017", tableLocation + "/year=2017"); + valuesAndLocations.put("2018", tableLocation + "/year=2018"); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, valuesAndLocations); + + try { + client.add_partitions(partitions); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } + } + } + + @Test + public void testAddPartitionsMoreThanThreadCountsOneFails() throws Exception { + + createTable(); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + + List partitions = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + String value = String.valueOf(2000 + i); + String location = tableLocation + "/year=" + value; + if (i == 30) { + location = "invalidhost:80000/wrongfolder"; + } + Partition partition = buildPartition(DB_NAME, TABLE_NAME, value, location); + partitions.add(partition); + } + + try { + client.add_partitions(partitions); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } + } + } + // Tests for List add_partitions(List partitions, // boolean ifNotExists, boolean needResults) method @@ -1531,4 +1627,43 @@ private void verifyPartitionAttributesDefaultValues(Partition partition, String Assert.assertTrue("Per default the skewedInfo column value location map should be empty.", skewedInfo.getSkewedColValueLocationMaps().isEmpty()); } + + private List buildPartitions(String dbName, String tableName, List values) + throws MetaException { + + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName; + List partitions = new ArrayList<>(); + + for (String value : values) { + Partition partition = + buildPartition(dbName, tableName, value, tableLocation + "/year=" + value); + partitions.add(partition); + } + return partitions; + } + + private List buildPartitions(String dbName, String tableName, + Map valuesAndLocations) throws MetaException { + + List partitions = new ArrayList<>(); + + for (Map.Entry valueAndLocation : valuesAndLocations.entrySet()) { + Partition partition = + buildPartition(dbName, tableName, valueAndLocation.getKey(), valueAndLocation.getValue()); + partitions.add(partition); + } + return partitions; + } + + private void createView(String tableName) throws Exception { + new TableBuilder() + .setDbName(DB_NAME) + .setTableName(tableName) + .setType("VIRTUAL_VIEW") + .addCol("test_id", "int", "test col id") + .addCol("test_value", "string", "test col value") + .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) + .setLocation(null) + .create(client, metaStore.getConf()); + } } diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java index b32954f..fc0c60f 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java @@ -519,31 +519,57 @@ public void testAddPartitionSpecWithSharedSDWithoutRelativePath() throws Excepti Assert.assertTrue(metaStore.isPathExists(new Path(part.getSd().getLocation()))); } - @Test(expected = AlreadyExistsException.class) + @Test public void testAddPartitionSpecPartAlreadyExists() throws Exception { createTable(); - Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Partition partition = + buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016a"); client.add_partition(partition); - Partition newPartition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, + Lists.newArrayList("2014", "2015", "2016", "2017", "2018")); PartitionSpecProxy partitionSpecProxy = - buildPartitionSpec(DB_NAME, TABLE_NAME, null, Lists.newArrayList(newPartition)); - client.add_partitions_pspec(partitionSpecProxy); + buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); + + try { + client.add_partitions_pspec(partitionSpecProxy); + Assert.fail("AlreadyExistsException should have happened."); + } catch (AlreadyExistsException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertEquals(1, parts.size()); + Assert.assertEquals(partition.getValues(), parts.get(0).getValues()); + for (Partition part : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation()))); + } } - @Test(expected = MetaException.class) + @Test public void testAddPartitionSpecPartDuplicateInSpec() throws Exception { createTable(); - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); - List partitions = new ArrayList<>(); - partitions.add(partition1); - partitions.add(partition2); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, + Lists.newArrayList("2014", "2015", "2017", "2017", "2018", "2019")); PartitionSpecProxy partitionSpecProxy = buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); - client.add_partitions_pspec(partitionSpecProxy); + try { + client.add_partitions_pspec(partitionSpecProxy); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } } @Test(expected = MetaException.class) @@ -672,20 +698,40 @@ public void testAddPartitionSpecEmptyLocationInTableToo() throws Exception { @Test(expected=MetaException.class) public void testAddPartitionSpecForView() throws Exception { - Table table = new TableBuilder() - .setDbName(DB_NAME) - .setTableName(TABLE_NAME) - .setType("VIRTUAL_VIEW") - .addCol("test_id", "int", "test col id") - .addCol("test_value", DEFAULT_COL_TYPE, "test col value") - .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) - .setLocation(null) - .create(client, metaStore.getConf()); + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + PartitionSpecProxy partitionSpecProxy = + buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition)); + client.add_partitions_pspec(partitionSpecProxy); + } - Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + @Test + public void testAddPartitionSpecForViewNullPartLocation() throws Exception { + + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + partition.getSd().setLocation(null); PartitionSpecProxy partitionSpecProxy = - buildPartitionSpec(DB_NAME, TABLE_NAME, null, Lists.newArrayList(partition)); + buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition)); client.add_partitions_pspec(partitionSpecProxy); + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNull(part.getSd().getLocation()); + } + + @Test + public void testAddPartitionsForViewNullPartSd() throws Exception { + + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + partition.setSd(null); + PartitionSpecProxy partitionSpecProxy = + buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition)); + client.add_partitions_pspec(partitionSpecProxy); + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNull(part.getSd()); } @Test @@ -790,33 +836,104 @@ public void testAddPartitionSpecWithSharedSDNoRelativePath() throws Exception { public void testAddPartitionSpecOneInvalid() throws Exception { createTable(); - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2016"); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017"); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Partition partition1 = + buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016"); + Partition partition2 = + buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017"); Partition partition3 = buildPartition(Lists.newArrayList("2015", "march"), getYearAndMonthPartCols(), 1); - partition3.getSd().setLocation(metaStore.getWarehouseRoot() + "/addparttest"); - List partitions = Lists.newArrayList(partition1, partition2, partition3); + partition3.getSd().setLocation(tableLocation + "/year=2015/month=march"); + Partition partition4 = + buildPartition(DB_NAME, TABLE_NAME, "2018", tableLocation + "/year=2018"); + Partition partition5 = + buildPartition(DB_NAME, TABLE_NAME, "2019", tableLocation + "/year=2019"); + List partitions = + Lists.newArrayList(partition1, partition2, partition3, partition4, partition5); + PartitionSpecProxy partitionSpecProxy = + buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); + + try { + client.add_partitions_pspec(partitionSpecProxy); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition part : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation()))); + } + } + + @Test + public void testAddPartitionSpecInvalidLocation() throws Exception { + + createTable(); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Map valuesAndLocations = new HashMap<>(); + valuesAndLocations.put("2014", tableLocation + "/year=2014"); + valuesAndLocations.put("2015", tableLocation + "/year=2015"); + valuesAndLocations.put("2016", "invalidhost:80000/wrongfolder"); + valuesAndLocations.put("2017", tableLocation + "/year=2017"); + valuesAndLocations.put("2018", tableLocation + "/year=2018"); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, valuesAndLocations); + PartitionSpecProxy partitionSpecProxy = + buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); + + try { + client.add_partitions_pspec(partitionSpecProxy); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } + } + } + + @Test + public void testAddPartitionSpecMoreThanThreadCountsOneFails() throws Exception { + + createTable(); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + + List partitions = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + String value = String.valueOf(2000 + i); + String location = tableLocation + "/year=" + value; + if (i == 30) { + location = "invalidhost:80000/wrongfolder"; + } + Partition partition = buildPartition(DB_NAME, TABLE_NAME, value, location); + partitions.add(partition); + } PartitionSpecProxy partitionSpecProxy = buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); + try { client.add_partitions_pspec(partitionSpecProxy); - Assert.fail("MetaException should have occurred."); + Assert.fail("MetaException should have happened."); } catch (MetaException e) { - // This is expected + // Expected exception } List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); Assert.assertNotNull(parts); Assert.assertTrue(parts.isEmpty()); - // TODO: This does not work correctly. None of the partitions is created, but the folder - // for the first two is created. It is because in HiveMetaStore.add_partitions_core when - // going through the partitions, the first two are already put and started in the thread - // pool when the exception occurs in the third one. - // When the exception occurs, we go to the finally part, but the map can be empty - // (it depends on the progress of the other threads) so the folders won't be deleted. - // Assert.assertTrue(metaStore.isPathExists(new Path(partition1.getSd().getLocation()))); - // Assert.assertTrue(metaStore.isPathExists(new Path(partition2.getSd().getLocation()))); - // Assert.assertTrue(metaStore.isPathExists(new Path(partition3.getSd().getLocation()))); + for (Partition partition : partitions) { + if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } + } } // Helper methods @@ -1049,4 +1166,43 @@ private void verifyPartitionSharedSD(Table table, String name, List valu sd.getLocation()); Assert.assertTrue(metaStore.isPathExists(new Path(sd.getLocation()))); } + + private List buildPartitions(String dbName, String tableName, List values) + throws MetaException { + + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName; + List partitions = new ArrayList<>(); + + for (String value : values) { + Partition partition = + buildPartition(dbName, tableName, value, tableLocation + "/year=" + value); + partitions.add(partition); + } + return partitions; + } + + private List buildPartitions(String dbName, String tableName, + Map valuesAndLocations) throws MetaException { + + List partitions = new ArrayList<>(); + + for (Map.Entry valueAndLocation : valuesAndLocations.entrySet()) { + Partition partition = + buildPartition(dbName, tableName, valueAndLocation.getKey(), valueAndLocation.getValue()); + partitions.add(partition); + } + return partitions; + } + + private void createView(String tableName) throws Exception { + new TableBuilder() + .setDbName(DB_NAME) + .setTableName(tableName) + .setType("VIRTUAL_VIEW") + .addCol("test_id", "int", "test col id") + .addCol("test_value", "string", "test col value") + .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) + .setLocation(null) + .create(client, metaStore.getConf()); + } }