diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 640b567ebf9af1af5a3dd9fa1442eb5c5b8ef1a4..1cf47c36cb490ce0b17ffe312cd2e9fc4bb7cd9a 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -32,6 +32,8 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -574,6 +576,86 @@ public void dropPartition() throws Exception { } @Test + public void exchangePartition() throws Exception { + String dbName = "default"; + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + List partCols = new ArrayList(); + partCols.add(new FieldSchema("part", "int", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd1 = new StorageDescriptor(cols, "file:/tmp/1", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table tab1 = new Table("tab1", dbName, "me", startTime, startTime, 0, sd1, partCols, + emptyParameters, null, null, null); + msClient.createTable(tab1); + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); // add_table + + StorageDescriptor sd2 = new StorageDescriptor(cols, "file:/tmp/2", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table tab2 = new Table("tab2", dbName, "me", startTime, startTime, 0, sd2, partCols, + emptyParameters, null, null, null); // add_table + msClient.createTable(tab2); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(1, rsp.getEventsSize()); + + StorageDescriptor sd1part = new StorageDescriptor(cols, "file:/tmp/1/part=1", "input", "output", false, 0, + serde, null, null, emptyParameters); + StorageDescriptor sd2part = new StorageDescriptor(cols, "file:/tmp/1/part=2", "input", "output", false, 0, + serde, null, null, emptyParameters); + StorageDescriptor sd3part = new StorageDescriptor(cols, "file:/tmp/1/part=3", "input", "output", false, 0, + serde, null, null, emptyParameters); + Partition part1 = new Partition(Arrays.asList("1"), "default", tab1.getTableName(), + startTime, startTime, sd1part, emptyParameters); + Partition part2 = new Partition(Arrays.asList("2"), "default", tab1.getTableName(), + startTime, startTime, sd2part, emptyParameters); + Partition part3 = new Partition(Arrays.asList("3"), "default", tab1.getTableName(), + startTime, startTime, sd3part, emptyParameters); + msClient.add_partitions(Arrays.asList(part1, part2, part3)); + rsp = msClient.getNextNotification(firstEventId + 2, 0, null); + assertEquals(1, rsp.getEventsSize()); // add_partition + + msClient.exchange_partition(ImmutableMap.of("part", "1"), + dbName, tab1.getTableName(), dbName, tab2.getTableName()); + + rsp = msClient.getNextNotification(firstEventId + 3, 0, null); + assertEquals(2, rsp.getEventsSize()); + + NotificationEvent event = rsp.getEvents().get(0); + assertEquals(firstEventId + 4, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType()); + assertEquals(dbName, event.getDbName()); + assertEquals(tab2.getTableName(), event.getTableName()); + + // Parse the message field + AddPartitionMessage addPtnMsg = md.getAddPartitionMessage(event.getMessage()); + assertEquals(dbName, addPtnMsg.getDB()); + assertEquals(tab2.getTableName(), addPtnMsg.getTable()); + Iterator ptnIter = addPtnMsg.getPartitionObjs().iterator(); + assertTrue(ptnIter.hasNext()); + Partition msgPart = ptnIter.next(); + assertEquals(part1.getValues(), msgPart.getValues()); + assertEquals(dbName, msgPart.getDbName()); + assertEquals(tab2.getTableName(), msgPart.getTableName()); + + event = rsp.getEvents().get(1); + assertEquals(firstEventId + 5, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.DROP_PARTITION.toString(), event.getEventType()); + assertEquals(dbName, event.getDbName()); + assertEquals(tab1.getTableName(), event.getTableName()); + + // Parse the message field + DropPartitionMessage dropPtnMsg = md.getDropPartitionMessage(event.getMessage()); + assertEquals(dbName, dropPtnMsg.getDB()); + assertEquals(tab1.getTableName(), dropPtnMsg.getTable()); + Iterator> parts = dropPtnMsg.getPartitions().iterator(); + assertTrue(parts.hasNext()); + assertEquals(part1.getValues(), Lists.newArrayList(parts.next().values())); + } + + @Test public void createFunction() throws Exception { String defaultDbName = "default"; String funcName = "createfunction"; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 13d0aab40eaf344869b8230614b55115f64857e6..8d91e0b3efb813b30bd8a413d29f364004e09e14 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -58,6 +58,7 @@ import javax.jdo.JDOException; import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -2937,8 +2938,8 @@ public Partition exchange_partition(Map partitionSpecs, Warehouse.makePartName(partitionKeysPresent, partValsPresent)); Path destPath = new Path(destinationTable.getSd().getLocation(), Warehouse.makePartName(partitionKeysPresent, partValsPresent)); + List destPartitions = new ArrayList(); try { - List destPartitions = new ArrayList(); for (Partition partition: partitionsToExchange) { Partition destPartition = new Partition(partition); destPartition.setDbName(destDbName); @@ -2962,6 +2963,12 @@ public Partition exchange_partition(Map partitionSpecs, * once https://issues.apache.org/jira/browse/HDFS-3370 is done */ pathCreated = wh.renameDir(sourcePath, destPath); + + // Setting success to false to make sure that if the listener fails, rollback happens. + success = false; + fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange, + destinationTable, destPartitions, transactionalListeners, true); + success = ms.commitTransaction(); return destPartitions; } finally { @@ -2970,6 +2977,35 @@ public Partition exchange_partition(Map partitionSpecs, if (pathCreated) { wh.renameDir(destPath, sourcePath); } + + fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange, + destinationTable, destPartitions, listeners, success); + } + } + } + + private void fireMetaStoreExchangePartitionEvent(Table sourceTable, + List partitionsToExchange, Table destinationTable, + List destPartitions, + List eventListeners, + boolean status) throws MetaException { + if (sourceTable != null && destinationTable != null + && !CollectionUtils.isEmpty(partitionsToExchange) + && !CollectionUtils.isEmpty(destPartitions)) { + if (eventListeners.size() > 0) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(destinationTable, destPartitions, status, this); + for (MetaStoreEventListener eventListener : eventListeners) { + eventListener.onAddPartition(addPartitionEvent); + } + + for (Partition partition : partitionsToExchange) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(sourceTable, partition, true, status, this); + for (MetaStoreEventListener eventListener : eventListeners) { + eventListener.onDropPartition(dropPartitionEvent); + } + } } } }