diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 662de9a..f4fd3e4 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -2798,54 +2798,6 @@ public boolean equals(Object obj) { } } - private static class PartValEqWrapperLite { - List values; - String location; - - PartValEqWrapperLite(Partition partition) { - this.values = partition.isSetValues()? partition.getValues() : null; - this.location = partition.getSd().getLocation(); - } - - @Override - public int hashCode() { - return values == null ? 0 : values.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || !(obj instanceof PartValEqWrapperLite)) { - return false; - } - - List lhsValues = this.values; - List rhsValues = ((PartValEqWrapperLite)obj).values; - - if (lhsValues == null || rhsValues == null) { - return lhsValues == rhsValues; - } - - if (lhsValues.size() != rhsValues.size()) { - return false; - } - - for (int i=0; i add_partitions_core(final RawStore ms, String dbName, String tblName, List parts, final boolean ifNotExists) throws TException { @@ -2871,77 +2823,103 @@ public boolean equals(Object obj) { firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); } - List> partFutures = Lists.newArrayList(); - final Table table = tbl; + Set partitionsToAdd = new HashSet<>(parts.size()); for (final Partition part : parts) { + // Iterate through the partitions and validate them. If one of the partitions is + // incorrect, an exception will be thrown before the threads which create the partition + // folders are submitted. This way we can be sure that no partition and no partition + // folder will be created if the list contains an invalid partition. if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { - throw new MetaException("Partition does not belong to target table " - + dbName + "." + tblName + ": " + part); + String errorMsg = String.format( + "Partition does not belong to target table %s.%s. It belongs to the table %s.%s : %s", + dbName, tblName, part.getDbName(), part.getTableName(), part.toString()); + throw new MetaException(errorMsg); } boolean shouldAdd = startAddPartition(ms, part, ifNotExists); if (!shouldAdd) { existingParts.add(part); - LOG.info("Not adding partition " + part + " as it already exists"); + LOG.info("Not adding partition {} as it already exists", part); continue; } - final UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); + PartValEqWrapper partValWrapper = new PartValEqWrapper(part); + if (partitionsToAdd.contains(partValWrapper)) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); } - partFutures.add(threadPool.submit(new Callable() { - @Override - public Partition call() throws Exception { - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - try { - boolean madeDir = createLocationForAddedPartition(table, part); - if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - initializeAddedPartition(table, part, madeDir); - } catch (MetaException e) { - throw new IOException(e.getMessage(), e); - } - return null; - } - }); - return part; + partitionsToAdd.add(partValWrapper); + } + + final AtomicBoolean failureOccurred = new AtomicBoolean(false); + final Table table = tbl; + List> partFutures = new ArrayList<>(partitionsToAdd.size()); + + final UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + for (final PartValEqWrapper partValWrapper : partitionsToAdd) { + Partition partition = partValWrapper.partition; + initializePartitionParameters(table, partition); + + partFutures.add(threadPool.submit(() -> { + if (failureOccurred.get()) { + return null; } + ugi.doAs((PrivilegedExceptionAction) () -> { + try { + boolean madeDir = createLocationForAddedPartition(table, partition); + addedPartitions.put(partValWrapper, madeDir); + initializeAddedPartition(table, partition, madeDir); + } catch (MetaException e) { + throw new IOException(e.getMessage(), e); + } + return null; + }); + return partition; })); } - try { - for (Future partFuture : partFutures) { + String errorMessage = null; + for (Future partFuture : partFutures) { + try { Partition part = partFuture.get(); - if (part != null) { + if (part != null && !failureOccurred.get()) { newParts.add(part); } + } catch (InterruptedException | ExecutionException e) { + // If an exception is thrown in the execution of a task, set the failureOccurred flag to + // true. This flag is visible in the tasks and if its value is true, the partition + // folders won't be created. + // Then iterate through the remaining tasks and wait for them to finish. The tasks which + // are started before the flag got set will then finish creating the partition folders. + // The tasks which are started after the flag got set, won't create the partition + // folders, to avoid unnecessary work. + // This way it is sure that all tasks are finished, when entering the finally part where + // the partition folders are cleaned up. It won't happen that a task is still running + // when cleaning up the folders, so it is sure we won't have leftover folders. + // Canceling the other tasks would be also an option but during testing it turned out + // that it is not a trustworthy solution to avoid leftover folders. + failureOccurred.compareAndSet(false, true); + errorMessage = e.getMessage(); } - } catch (InterruptedException | ExecutionException e) { - // cancel other tasks - for (Future partFuture : partFutures) { - partFuture.cancel(true); - } - throw new MetaException(e.getMessage()); + } + + if (failureOccurred.get()) { + throw new MetaException(errorMessage); } if (!newParts.isEmpty()) { - success = ms.addPartitions(dbName, tblName, newParts); - } else { - success = true; + ms.addPartitions(dbName, tblName, newParts); } - // Setting success to false to make sure that if the listener fails, rollback happens. - success = false; // Notification is generated for newly created partitions only. The subset of partitions // that already exist (existingParts), will not generate notifications. if (!transactionalListeners.isEmpty()) { @@ -3063,8 +3041,8 @@ private int add_partitions_pspec_core( throws TException { boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. - final Map addedPartitions = - Collections.synchronizedMap(new HashMap()); + final Map addedPartitions = + Collections.synchronizedMap(new HashMap()); PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(partSpecs); final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy .getPartitionIterator(); @@ -3079,70 +3057,99 @@ private int add_partitions_pspec_core( } firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); - List> partFutures = Lists.newArrayList(); - final Table table = tbl; - while(partitionIterator.hasNext()) { + Set partitionsToAdd = new HashSet<>(partitionSpecProxy.size()); + while (partitionIterator.hasNext()) { + // Iterate through the partitions and validate them. If one of the partitions is + // incorrect, an exception will be thrown before the threads which create the partition + // folders are submitted. This way we can be sure that no partition or partition folder + // will be created if the list contains an invalid partition. final Partition part = partitionIterator.getCurrent(); if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { - throw new MetaException("Partition does not belong to target table " - + dbName + "." + tblName + ": " + part); + String errorMsg = String.format( + "Partition does not belong to target table %s.%s. It belongs to the table %s.%s : %s", + dbName, tblName, part.getDbName(), part.getTableName(), part.toString()); + throw new MetaException(errorMsg); } boolean shouldAdd = startAddPartition(ms, part, ifNotExists); if (!shouldAdd) { - LOG.info("Not adding partition " + part + " as it already exists"); + LOG.info("Not adding partition {} as it already exists", part); continue; } - final UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); + PartValEqWrapper partValWrapper = new PartValEqWrapper(part); + if (partitionsToAdd.contains(partValWrapper)) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); } - partFutures.add(threadPool.submit(new Callable() { - @Override public Partition call() throws Exception { - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Partition run() throws Exception { - try { - boolean madeDir = createLocationForAddedPartition(table, part); - if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - initializeAddedPartition(table, part, madeDir); - } catch (MetaException e) { - throw new IOException(e.getMessage(), e); - } - return null; - } - }); - return part; - } - })); + partitionsToAdd.add(partValWrapper); partitionIterator.next(); } + final AtomicBoolean failureOccurred = new AtomicBoolean(false); + List> partFutures = new ArrayList<>(partitionsToAdd.size()); + final Table table = tbl; + + final UserGroupInformation ugi; try { - for (Future partFuture : partFutures) { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + for (final PartValEqWrapper partValWrapper : partitionsToAdd) { + Partition partition = partValWrapper.partition; + initializePartitionParameters(table, partition); + + partFutures.add(threadPool.submit(() -> { + if (failureOccurred.get()) { + return null; + } + ugi.doAs((PrivilegedExceptionAction) () -> { + try { + boolean madeDir = createLocationForAddedPartition(table, partition); + addedPartitions.put(partValWrapper, madeDir); + initializeAddedPartition(table, partition, madeDir); + } catch (MetaException e) { + throw new IOException(e.getMessage(), e); + } + return null; + }); + return partition; + })); + } + + String errorMessage = null; + for (Future partFuture : partFutures) { + try { partFuture.get(); + } catch (InterruptedException | ExecutionException e) { + // If an exception is thrown in the execution of a task, set the failureOccurred flag to + // true. This flag is visible in the tasks and if its value is true, the partition + // folders won't be created. + // Then iterate through the remaining tasks and wait for them to finish. The tasks which + // are started before the flag got set will then finish creating the partition folders. + // The tasks which are started after the flag got set, won't create the partition + // folders, to avoid unnecessary work. + // This way it is sure that all tasks are finished, when entering the finally part where + // the partition folders are cleaned up. It won't happen that a task is still running + // when cleaning up the folders, so it is sure we won't have leftover folders. + // Canceling the other tasks would be also an option but during testing it turned out + // that it is not a trustworthy solution to avoid leftover folders. + failureOccurred.compareAndSet(false, true); + errorMessage = e.getMessage(); } - } catch (InterruptedException | ExecutionException e) { - // cancel other tasks - for (Future partFuture : partFutures) { - partFuture.cancel(true); - } - throw new MetaException(e.getMessage()); } - success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists); - //setting success to false to make sure that if the listener fails, rollback happens. - success = false; + if (failureOccurred.get()) { + throw new MetaException(errorMessage); + } + + ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists); if (!transactionalListeners.isEmpty()) { transactionalListenerResponses = @@ -3156,10 +3163,10 @@ public Partition run() throws Exception { } finally { if (!success) { ms.rollbackTransaction(); - for (Map.Entry e : addedPartitions.entrySet()) { + for (Map.Entry e : addedPartitions.entrySet()) { if (e.getValue()) { // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); + wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true); } } } @@ -3251,6 +3258,16 @@ private void initializeAddedPartition( part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) { part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); } + } + + private void initializePartitionParameters(final Table tbl, final Partition part) + throws MetaException { + initializePartitionParameters(tbl, + new PartitionSpecProxy.SimplePartitionWrapperIterator(part)); + } + + private void initializePartitionParameters(final Table tbl, + final PartitionSpecProxy.PartitionIterator part) throws MetaException { // Inherit table properties into partition properties. Map tblParams = tbl.getParameters(); @@ -3291,6 +3308,7 @@ private Partition add_partition_core(final RawStore ms, boolean madeDir = createLocationForAddedPartition(tbl, part); try { initializeAddedPartition(tbl, part, madeDir); + initializePartitionParameters(tbl, part); success = ms.addPartition(part); } finally { if (!success && madeDir) { diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java index 4d9cb1b..4923266 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.metastore.client; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -364,21 +366,39 @@ public void testAddPartitionNullLocationInTableToo() throws Exception { @Test(expected = MetaException.class) public void testAddPartitionForView() throws Exception { - Table table = new TableBuilder() - .setDbName(DB_NAME) - .setTableName(TABLE_NAME) - .setType("VIRTUAL_VIEW") - .addCol("test_id", "int", "test col id") - .addCol("test_value", DEFAULT_COL_TYPE, "test col value") - .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) - .setLocation(null) - .build(); - client.createTable(table); - Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); client.add_partition(partition); } @Test + public void testAddPartitionsForViewNullPartLocation() throws Exception { + + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + partition.getSd().setLocation(null); + List partitions = Lists.newArrayList(partition); + client.add_partitions(partitions); + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNull(part.getSd().getLocation()); + } + + @Test + public void testAddPartitionsForViewNullPartSd() throws Exception { + + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + partition.setSd(null); + List partitions = Lists.newArrayList(partition); + client.add_partitions(partitions); + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNull(part.getSd()); + } + + @Test public void testAddPartitionForExternalTable() throws Exception { String tableName = "part_add_ext_table"; @@ -668,20 +688,26 @@ public void testAddPartitionsDifferentDBs() throws Exception { client.dropDatabase("parttestdb2", true, true, true); } - @Test(expected = MetaException.class) + @Test public void testAddPartitionsDuplicateInTheList() throws Exception { createTable(); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, + Lists.newArrayList("2014", "2015", "2017", "2017", "2018", "2019")); - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2017"); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2016"); - Partition partition3 = buildPartition(DB_NAME, TABLE_NAME, "2017"); + try { + client.add_partitions(partitions); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + // Expected exception + } - List partitions = new ArrayList<>(); - partitions.add(partition1); - partitions.add(partition2); - partitions.add(partition3); - client.add_partitions(partitions); + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } } @Test @@ -706,22 +732,32 @@ public void testAddPartitionsWithSameNameInTheListCaseSensitive() throws Excepti Assert.assertTrue(parts.contains("year=THIS")); } - @Test(expected = AlreadyExistsException.class) + @Test public void testAddPartitionsAlreadyExists() throws Exception { createTable(); - Partition partition = buildPartition(DB_NAME, TABLE_NAME, "2017"); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Partition partition = + buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016a"); client.add_partition(partition); - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2015"); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017"); - Partition partition3 = buildPartition(DB_NAME, TABLE_NAME, "2016"); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, + Lists.newArrayList("2014", "2015", "2016", "2017", "2018")); - List partitions = new ArrayList<>(); - partitions.add(partition1); - partitions.add(partition2); - partitions.add(partition3); - client.add_partitions(partitions); + try { + client.add_partitions(partitions); + Assert.fail("AlreadyExistsException should have happened."); + } catch (AlreadyExistsException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertEquals(1, parts.size()); + Assert.assertEquals(partition.getValues(), parts.get(0).getValues()); + for (Partition part : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation()))); + } } @Test(expected = MetaException.class) @@ -809,16 +845,24 @@ public void testAddPartitionsOneInvalid() throws Exception { createTable(); String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016"); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017"); + Partition partition1 = + buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016"); + Partition partition2 = + buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017"); Partition partition3 = buildPartition(Lists.newArrayList("2015", "march"), getYearAndMonthPartCols(), 1); partition3.getSd().setLocation(tableLocation + "/year=2015/month=march"); + Partition partition4 = + buildPartition(DB_NAME, TABLE_NAME, "2018", tableLocation + "/year=2018"); + Partition partition5 = + buildPartition(DB_NAME, TABLE_NAME, "2019", tableLocation + "/year=2019"); List partitions = new ArrayList<>(); partitions.add(partition1); partitions.add(partition2); partitions.add(partition3); + partitions.add(partition4); + partitions.add(partition5); try { client.add_partitions(partitions); @@ -830,15 +874,9 @@ public void testAddPartitionsOneInvalid() throws Exception { List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); Assert.assertNotNull(parts); Assert.assertTrue(parts.isEmpty()); - // TODO: This does not work correctly. None of the partitions is created, but the folder - // for the first two is created. It is because in HiveMetaStore.add_partitions_core when - // going through the partitions, the first two are already put and started in the thread - // pool when the exception occurs in the third one. When the exception occurs, we go to - // the finally part, but the map can be empty (it depends on the progress of the other - // threads) so the folders won't be deleted. -// Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2016"))); -// Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2017"))); - Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2015/month=march"))); + for (Partition part : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation()))); + } } @Test(expected = MetaException.class) @@ -968,17 +1006,9 @@ public void testAddPartitionsNullLocationInTableToo() throws Exception { @Test(expected=MetaException.class) public void testAddPartitionsForView() throws Exception { - Table table = new TableBuilder() - .setDbName(DB_NAME) - .setTableName(TABLE_NAME) - .setType("VIRTUAL_VIEW") - .addCol("test_id", "int", "test col id") - .addCol("test_value", "string", "test col value") - .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) - .setLocation(null) - .build(); - client.createTable(table); - Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); List partitions = Lists.newArrayList(partition); client.add_partitions(partitions); } @@ -1102,6 +1132,70 @@ public void testAddPartitionsEmptyValue() throws Exception { Assert.assertEquals("year=__HIVE_DEFAULT_PARTITION__", partitionNames.get(0)); } + @Test + public void testAddPartitionsInvalidLocation() throws Exception { + + createTable(); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Map valuesAndLocations = new HashMap<>(); + valuesAndLocations.put("2014", tableLocation + "/year=2014"); + valuesAndLocations.put("2015", tableLocation + "/year=2015"); + valuesAndLocations.put("2016", "invalidhost:80000/wrongfolder"); + valuesAndLocations.put("2017", tableLocation + "/year=2017"); + valuesAndLocations.put("2018", tableLocation + "/year=2018"); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, valuesAndLocations); + + try { + client.add_partitions(partitions); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } + } + } + + @Test + public void testAddPartitionsMoreThanThreadCountsOneFails() throws Exception { + + createTable(); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + + List partitions = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + String value = String.valueOf(2000 + i); + String location = tableLocation + "/year=" + value; + if (i == 30) { + location = "invalidhost:80000/wrongfolder"; + } + Partition partition = buildPartition(DB_NAME, TABLE_NAME, value, location); + partitions.add(partition); + } + + try { + client.add_partitions(partitions); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } + } + } + // Tests for List add_partitions(List partitions, // boolean ifNotExists, boolean needResults) method @@ -1467,4 +1561,44 @@ private void verifyPartitionAttributesDefaultValues(Partition partition, String Assert.assertTrue("Per default the skewedInfo column value location map should be empty.", skewedInfo.getSkewedColValueLocationMaps().isEmpty()); } + + private List buildPartitions(String dbName, String tableName, List values) + throws MetaException { + + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName; + List partitions = new ArrayList<>(); + + for (String value : values) { + Partition partition = + buildPartition(dbName, tableName, value, tableLocation + "/year=" + value); + partitions.add(partition); + } + return partitions; + } + + private List buildPartitions(String dbName, String tableName, + Map valuesAndLocations) throws MetaException { + + List partitions = new ArrayList<>(); + + for (Map.Entry valueAndLocation : valuesAndLocations.entrySet()) { + Partition partition = + buildPartition(dbName, tableName, valueAndLocation.getKey(), valueAndLocation.getValue()); + partitions.add(partition); + } + return partitions; + } + + private void createView(String tableName) throws Exception { + Table table = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(tableName) + .setType("VIRTUAL_VIEW") + .addCol("test_id", "int", "test col id") + .addCol("test_value", "string", "test col value") + .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) + .setLocation(null) + .build(); + client.createTable(table); + } } diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java index 1122057..694042d 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java @@ -518,31 +518,57 @@ public void testAddPartitionSpecWithSharedSDWithoutRelativePath() throws Excepti Assert.assertTrue(metaStore.isPathExists(new Path(part.getSd().getLocation()))); } - @Test(expected = AlreadyExistsException.class) + @Test public void testAddPartitionSpecPartAlreadyExists() throws Exception { createTable(); - Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Partition partition = + buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016a"); client.add_partition(partition); - Partition newPartition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, + Lists.newArrayList("2014", "2015", "2016", "2017", "2018")); PartitionSpecProxy partitionSpecProxy = - buildPartitionSpec(DB_NAME, TABLE_NAME, null, Lists.newArrayList(newPartition)); - client.add_partitions_pspec(partitionSpecProxy); + buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); + + try { + client.add_partitions_pspec(partitionSpecProxy); + Assert.fail("AlreadyExistsException should have happened."); + } catch (AlreadyExistsException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertEquals(1, parts.size()); + Assert.assertEquals(partition.getValues(), parts.get(0).getValues()); + for (Partition part : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation()))); + } } - @Test(expected = MetaException.class) + @Test public void testAddPartitionSpecPartDuplicateInSpec() throws Exception { createTable(); - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); - List partitions = new ArrayList<>(); - partitions.add(partition1); - partitions.add(partition2); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, + Lists.newArrayList("2014", "2015", "2017", "2017", "2018", "2019")); PartitionSpecProxy partitionSpecProxy = buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); - client.add_partitions_pspec(partitionSpecProxy); + try { + client.add_partitions_pspec(partitionSpecProxy); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } } @Test(expected = MetaException.class) @@ -671,21 +697,40 @@ public void testAddPartitionSpecEmptyLocationInTableToo() throws Exception { @Test(expected=MetaException.class) public void testAddPartitionSpecForView() throws Exception { - Table table = new TableBuilder() - .setDbName(DB_NAME) - .setTableName(TABLE_NAME) - .setType("VIRTUAL_VIEW") - .addCol("test_id", "int", "test col id") - .addCol("test_value", DEFAULT_COL_TYPE, "test col value") - .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) - .setLocation(null) - .build(); - client.createTable(table); + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + PartitionSpecProxy partitionSpecProxy = + buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition)); + client.add_partitions_pspec(partitionSpecProxy); + } - Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE); + @Test + public void testAddPartitionSpecForViewNullPartLocation() throws Exception { + + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + partition.getSd().setLocation(null); PartitionSpecProxy partitionSpecProxy = - buildPartitionSpec(DB_NAME, TABLE_NAME, null, Lists.newArrayList(partition)); + buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition)); + client.add_partitions_pspec(partitionSpecProxy); + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNull(part.getSd().getLocation()); + } + + @Test + public void testAddPartitionsForViewNullPartSd() throws Exception { + + String tableName = "test_add_partition_view"; + createView(tableName); + Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE); + partition.setSd(null); + PartitionSpecProxy partitionSpecProxy = + buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition)); client.add_partitions_pspec(partitionSpecProxy); + Partition part = client.getPartition(DB_NAME, tableName, "year=2017"); + Assert.assertNull(part.getSd()); } @Test @@ -790,33 +835,104 @@ public void testAddPartitionSpecWithSharedSDNoRelativePath() throws Exception { public void testAddPartitionSpecOneInvalid() throws Exception { createTable(); - Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2016"); - Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017"); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Partition partition1 = + buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016"); + Partition partition2 = + buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017"); Partition partition3 = buildPartition(Lists.newArrayList("2015", "march"), getYearAndMonthPartCols(), 1); - partition3.getSd().setLocation(metaStore.getWarehouseRoot() + "/addparttest"); - List partitions = Lists.newArrayList(partition1, partition2, partition3); + partition3.getSd().setLocation(tableLocation + "/year=2015/month=march"); + Partition partition4 = + buildPartition(DB_NAME, TABLE_NAME, "2018", tableLocation + "/year=2018"); + Partition partition5 = + buildPartition(DB_NAME, TABLE_NAME, "2019", tableLocation + "/year=2019"); + List partitions = + Lists.newArrayList(partition1, partition2, partition3, partition4, partition5); PartitionSpecProxy partitionSpecProxy = buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); + try { client.add_partitions_pspec(partitionSpecProxy); - Assert.fail("MetaException should have occurred."); + Assert.fail("MetaException should have happened."); } catch (MetaException e) { - // This is expected + // Expected exception } List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); Assert.assertNotNull(parts); Assert.assertTrue(parts.isEmpty()); - // TODO: This does not work correctly. None of the partitions is created, but the folder - // for the first two is created. It is because in HiveMetaStore.add_partitions_core when - // going through the partitions, the first two are already put and started in the thread - // pool when the exception occurs in the third one. - // When the exception occurs, we go to the finally part, but the map can be empty - // (it depends on the progress of the other threads) so the folders won't be deleted. - // Assert.assertTrue(metaStore.isPathExists(new Path(partition1.getSd().getLocation()))); - // Assert.assertTrue(metaStore.isPathExists(new Path(partition2.getSd().getLocation()))); - // Assert.assertTrue(metaStore.isPathExists(new Path(partition3.getSd().getLocation()))); + for (Partition part : partitions) { + Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation()))); + } + } + + @Test + public void testAddPartitionSpecInvalidLocation() throws Exception { + + createTable(); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + Map valuesAndLocations = new HashMap<>(); + valuesAndLocations.put("2014", tableLocation + "/year=2014"); + valuesAndLocations.put("2015", tableLocation + "/year=2015"); + valuesAndLocations.put("2016", "invalidhost:80000/wrongfolder"); + valuesAndLocations.put("2017", tableLocation + "/year=2017"); + valuesAndLocations.put("2018", tableLocation + "/year=2018"); + List partitions = buildPartitions(DB_NAME, TABLE_NAME, valuesAndLocations); + PartitionSpecProxy partitionSpecProxy = + buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); + + try { + client.add_partitions_pspec(partitionSpecProxy); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } + } + } + + @Test + public void testAddPartitionSpecMoreThanThreadCountsOneFails() throws Exception { + + createTable(); + String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME; + + List partitions = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + String value = String.valueOf(2000 + i); + String location = tableLocation + "/year=" + value; + if (i == 30) { + location = "invalidhost:80000/wrongfolder"; + } + Partition partition = buildPartition(DB_NAME, TABLE_NAME, value, location); + partitions.add(partition); + } + PartitionSpecProxy partitionSpecProxy = + buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions); + + try { + client.add_partitions_pspec(partitionSpecProxy); + Assert.fail("MetaException should have happened."); + } catch (MetaException e) { + // Expected exception + } + + List parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX); + Assert.assertNotNull(parts); + Assert.assertTrue(parts.isEmpty()); + for (Partition partition : partitions) { + if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) { + Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation()))); + } + } } // Helper methods @@ -1051,4 +1167,44 @@ private void verifyPartitionSharedSD(Table table, String name, List valu sd.getLocation()); Assert.assertTrue(metaStore.isPathExists(new Path(sd.getLocation()))); } + + private List buildPartitions(String dbName, String tableName, List values) + throws MetaException { + + String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName; + List partitions = new ArrayList<>(); + + for (String value : values) { + Partition partition = + buildPartition(dbName, tableName, value, tableLocation + "/year=" + value); + partitions.add(partition); + } + return partitions; + } + + private List buildPartitions(String dbName, String tableName, + Map valuesAndLocations) throws MetaException { + + List partitions = new ArrayList<>(); + + for (Map.Entry valueAndLocation : valuesAndLocations.entrySet()) { + Partition partition = + buildPartition(dbName, tableName, valueAndLocation.getKey(), valueAndLocation.getValue()); + partitions.add(partition); + } + return partitions; + } + + private void createView(String tableName) throws Exception { + Table table = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(tableName) + .setType("VIRTUAL_VIEW") + .addCol("test_id", "int", "test col id") + .addCol("test_value", "string", "test col value") + .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE) + .setLocation(null) + .build(); + client.createTable(table); + } }