diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index 37228b0..da4ae09 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -132,8 +132,10 @@ private HCatConstants() { // restrict instantiation public static final String HCAT_EVENT = "HCAT_EVENT"; public static final String HCAT_ADD_PARTITION_EVENT = "ADD_PARTITION"; public static final String HCAT_DROP_PARTITION_EVENT = "DROP_PARTITION"; + public static final String HCAT_ALTER_PARTITION_EVENT = "ALTER_PARTITION"; public static final String HCAT_PARTITION_DONE_EVENT = "PARTITION_DONE"; public static final String HCAT_CREATE_TABLE_EVENT = "CREATE_TABLE"; + public static final String HCAT_ALTER_TABLE_EVENT = "ALTER_TABLE"; public static final String HCAT_DROP_TABLE_EVENT = "DROP_TABLE"; public static final String HCAT_CREATE_DATABASE_EVENT = "CREATE_DATABASE"; public static final String HCAT_DROP_DATABASE_EVENT = "DROP_DATABASE"; diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 6a6ae5f..3ea2827 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; @@ -140,7 +141,7 @@ public void onDropTable (DropTableEvent tableEvent) throws MetaException { * @throws MetaException */ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { - /*Table before = tableEvent.getOldTable(); + Table before = tableEvent.getOldTable(); Table after = tableEvent.getNewTable(); NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_ALTER_TABLE_EVENT, @@ -149,8 +150,7 @@ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { event.setDbName(after.getDbName()); event.setTableName(after.getTableName()); enqueue(event); - }*/ - // TODO - once HIVE-9175 is committed + } } /** @@ -187,7 +187,16 @@ public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaExce * @throws MetaException */ public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaException { - // TODO, MessageFactory doesn't support Alter Partition yet. + Partition before = partitionEvent.getOldPartition(); + Partition after = partitionEvent.getNewPartition(); + NotificationEvent event = new NotificationEvent(0, now(), + HCatConstants.HCAT_ALTER_PARTITION_EVENT, + msgFactory.buildAlterPartitionMessage(before, after).toString()); + if (event != null) { + event.setDbName(before.getDbName()); + event.setTableName(before.getTableName()); + enqueue(event); + } } /** diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index a72c6b5..c383047 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AlterTableMessage; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.hive.hcatalog.messaging.MessageFactory; import org.slf4j.Logger; @@ -116,7 +117,7 @@ public NotificationListener(final Configuration conf) { testAndCreateConnection(); } - private static String getTopicName(Table table, ListenerEvent partitionEvent) { + private static String getTopicName(Table table) { return table.getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); } @@ -129,7 +130,7 @@ public void onAddPartition(AddPartitionEvent partitionEvent) if (partitionEvent.getStatus()) { Table table = partitionEvent.getTable(); List partitions = partitionEvent.getPartitions(); - String topicName = getTopicName(table, partitionEvent); + String topicName = getTopicName(table); if (topicName != null && !topicName.equals("")) { send(messageFactory.buildAddPartitionMessage(table, partitions), topicName); } else { @@ -144,6 +145,17 @@ public void onAddPartition(AddPartitionEvent partitionEvent) } } + @Override + public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { + if (ape.getStatus()) { + Partition before = ape.getOldPartition(); + Partition after = ape.getNewPartition(); + + String topicName = getTopicName(ape.getTable()); + send(messageFactory.buildAlterPartitionMessage(before, after), topicName); + } + } + /** * Send dropped partition notifications. Subscribers can receive these notifications for a * particular table by listening on a topic named "dbName.tableName" with message selector @@ -165,7 +177,7 @@ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaExcept sd.setParameters(new HashMap()); sd.getSerdeInfo().setParameters(new HashMap()); sd.getSkewedInfo().setSkewedColNames(new ArrayList()); - String topicName = getTopicName(partitionEvent.getTable(), partitionEvent); + String topicName = getTopicName(partitionEvent.getTable()); if (topicName != null && !topicName.equals("")) { send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), topicName); } else { @@ -241,6 +253,35 @@ private String getTopicPrefix(Configuration conf) { } /** + * Send altered table notifications. Subscribers can receive these notifications for + * dropped tables by listening on topic "HCAT" with message selector string + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_ALTER_TABLE_EVENT} + */ + @Override + public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { + if (tableEvent.getStatus()) { + Table before = tableEvent.getOldTable(); + Table after = tableEvent.getNewTable(); + + // onCreateTable alters the table to add the topic name. Since this class is generating + // that alter, we don't want to notify on that alter. So take a quick look and see if + // that's what this this alter is, and if so swallow it. + if (after.getParameters() != null && + after.getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME) != null && + (before.getParameters() == null || + before.getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME) == null)) { + return; + } + // I think this is wrong, the alter table statement should come on the table topic not the + // DB topic - Alan. + String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + + after.getDbName().toLowerCase(); + send(messageFactory.buildAlterTableMessage(before, after), topicName); + } + } + + /** * Send dropped table notifications. Subscribers can receive these notifications for * dropped tables by listening on topic "HCAT" with message selector string * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = @@ -262,6 +303,8 @@ public void onDropTable(DropTableEvent tableEvent) throws MetaException { if (tableEvent.getStatus()) { Table table = tableEvent.getTable(); + // I think this is wrong, the drop table statement should come on the table topic not the + // DB topic - Alan. String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase(); send(messageFactory.buildDropTableMessage(table), topicName); } @@ -435,14 +478,4 @@ public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) // if(lpde.getStatus()) // send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT); } - - @Override - public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { - // no-op - } - - @Override - public void onAlterTable(AlterTableEvent ate) throws MetaException { - // no-op - } } diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java new file mode 100644 index 0000000..7412b60 --- /dev/null +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.messaging; + +import java.util.List; + +/** + * HCat message sent when a table is Altered. + */ +public abstract class AlterPartitionMessage extends HCatEventMessage { + + protected AlterPartitionMessage() { + super(EventType.ALTER_PARTITION); + } + + public abstract String getTable(); + + public abstract List getValues(); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) throw new IllegalStateException("Table name unset."); + if (getValues() == null) throw new IllegalStateException("Partition values unset"); + return super.checkValid(); + } +} + diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java new file mode 100644 index 0000000..0b58f29 --- /dev/null +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.messaging; + +/** + * HCat message sent when a table is Altered. + */ +public abstract class AlterTableMessage extends HCatEventMessage { + + protected AlterTableMessage() { + super(EventType.ALTER_TABLE); + } + + public abstract String getTable(); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java index 4d77057..4a8f290 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java @@ -37,7 +37,9 @@ CREATE_TABLE(HCatConstants.HCAT_CREATE_TABLE_EVENT), DROP_TABLE(HCatConstants.HCAT_DROP_TABLE_EVENT), ADD_PARTITION(HCatConstants.HCAT_ADD_PARTITION_EVENT), - DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT); + DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT), + ALTER_TABLE(HCatConstants.HCAT_ALTER_TABLE_EVENT), + ALTER_PARTITION(HCatConstants.HCAT_ALTER_PARTITION_EVENT); private String typeString; diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java index a89c956..0bf3948 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java @@ -36,10 +36,14 @@ public HCatEventMessage getHCatEventMessage(String eventTypeString, String messa return getDropDatabaseMessage(messageBody); case CREATE_TABLE: return getCreateTableMessage(messageBody); + case ALTER_TABLE: + return getAlterTableMessage(messageBody); case DROP_TABLE: return getDropTableMessage(messageBody); case ADD_PARTITION: return getAddPartitionMessage(messageBody); + case ALTER_PARTITION: + return getAlterPartitionMessage(messageBody); case DROP_PARTITION: return getDropPartitionMessage(messageBody); @@ -64,6 +68,13 @@ public HCatEventMessage getHCatEventMessage(String eventTypeString, String messa public abstract CreateTableMessage getCreateTableMessage(String messageBody); /** + * Method to de-serialize AlterTableMessge + * @param messageBody string message + * @return object message + */ + public abstract AlterTableMessage getAlterTableMessage(String messageBody); + + /** * Method to de-serialize DropTableMessage instance. */ public abstract DropTableMessage getDropTableMessage(String messageBody); @@ -74,6 +85,13 @@ public HCatEventMessage getHCatEventMessage(String eventTypeString, String messa public abstract AddPartitionMessage getAddPartitionMessage(String messageBody); /** + * Method to deserialize AlterPartitionMessage + * @param messageBody the message in serialized form + * @return message in object form + */ + public abstract AlterPartitionMessage getAlterPartitionMessage(String messageBody); + + /** * Method to de-serialize DropPartitionMessage instance. */ public abstract DropPartitionMessage getDropPartitionMessage(String messageBody); diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 2dbcbe5..fcab469 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -119,6 +119,17 @@ public static MessageDeserializer getDeserializer(String format, public abstract CreateTableMessage buildCreateTableMessage(Table table); /** + * Factory method for AlterTableMessage. Unlike most of these calls, this one can return null, + * which means no message should be sent. This is because there are many flavors of alter + * table (add column, add partition, etc.). Some are covered elsewhere (like add partition) + * and some are not yet supported. + * @param before The table before the alter + * @param after The table after the alter + * @return + */ + public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after); + + /** * Factory method for DropTableMessage. * @param table The Table being dropped. * @return DropTableMessage instance. @@ -144,6 +155,15 @@ public static MessageDeserializer getDeserializer(String format, public abstract AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec); /** + * Factory method for building AlterPartitionMessage + * @param before The partition before it was altered + * @param after The partition after it was altered + * @return a new AlterPartitionMessage + */ + public abstract AlterPartitionMessage buildAlterPartitionMessage(Partition before, + Partition after); + + /** * Factory method for DropPartitionMessage. * @param table The Table from which the partition is dropped. * @param partition The Partition being dropped. diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java new file mode 100644 index 0000000..1e2456d --- /dev/null +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; +import org.apache.hive.hcatalog.messaging.AlterTableMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * JSON alter table message + */ +public class JSONAlterPartitionMessage extends AlterPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List values; + + + public JSONAlterPartitionMessage(String server, + String servicePrincipal, + String db, + String table, + List values, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + this.values = values; + checkValid(); + } + + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + @Override + public List getValues() { + return values; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } +} diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java new file mode 100644 index 0000000..2848843 --- /dev/null +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AlterTableMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.ArrayList; +import java.util.List; + +/** + * JSON alter table message + */ +public class JSONAlterTableMessage extends AlterTableMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + public JSONAlterTableMessage(String server, + String servicePrincipal, + String db, + String table, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } +} diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java index 8a4538a..f33ca2f 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java @@ -19,7 +19,10 @@ package org.apache.hive.hcatalog.messaging.json; +import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; +import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; +import org.apache.hive.hcatalog.messaging.AlterTableMessage; import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage; import org.apache.hive.hcatalog.messaging.CreateTableMessage; import org.apache.hive.hcatalog.messaging.DropDatabaseMessage; @@ -71,6 +74,17 @@ public CreateTableMessage getCreateTableMessage(String messageBody) { } @Override + public AlterTableMessage getAlterTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct appropriate alter table type.", + exception); + } + } + + @Override public DropTableMessage getDropTableMessage(String messageBody) { try { return mapper.readValue(messageBody, JSONDropTableMessage.class); @@ -91,6 +105,15 @@ public AddPartitionMessage getAddPartitionMessage(String messageBody) { } @Override + public AlterPartitionMessage getAlterPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterPartitionMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct AlterPartitionMessage.", e); + } + } + + @Override public DropPartitionMessage getDropPartitionMessage(String messageBody) { try { return mapper.readValue(messageBody, JSONDropPartitionMessage.class); diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 5a475a1..19695ed 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -24,13 +24,12 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; -// TODO, once HIVE-9175 is committed -// import org.apache.hive.hcatalog.messaging.AlterTableMessage; +import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; +import org.apache.hive.hcatalog.messaging.AlterTableMessage; import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage; import org.apache.hive.hcatalog.messaging.CreateTableMessage; import org.apache.hive.hcatalog.messaging.DropDatabaseMessage; @@ -85,14 +84,11 @@ public CreateTableMessage buildCreateTableMessage(Table table) { table.getTableName(), now()); } - // TODO Once HIVE-9175 is committed - /* @Override public AlterTableMessage buildAlterTableMessage(Table before, Table after) { return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), before.getTableName(), now()); } - */ @Override public DropTableMessage buildDropTableMessage(Table table) { @@ -111,7 +107,13 @@ public AddPartitionMessage buildAddPartitionMessage(Table table, List @InterfaceStability.Evolving public AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec) { return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), getPartitionKeyValues(table, partitionSpec), System.currentTimeMillis()/1000); + table.getTableName(), getPartitionKeyValues(table, partitionSpec), now()); + } + + @Override + public AlterPartitionMessage buildAlterPartitionMessage(Partition before, Partition after) { + return new JSONAlterPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, + before.getDbName(), before.getTableName(), before.getValues(), now()); } @Override diff --git hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java index da1ae62..bf61dcf 100644 --- hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java +++ hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java @@ -46,6 +46,8 @@ import org.apache.hive.hcatalog.mapreduce.HCatBaseTest; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; +import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; +import org.apache.hive.hcatalog.messaging.AlterTableMessage; import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage; import org.apache.hive.hcatalog.messaging.CreateTableMessage; import org.apache.hive.hcatalog.messaging.DropDatabaseMessage; @@ -104,7 +106,9 @@ public void tearDown() throws Exception { HCatConstants.HCAT_CREATE_DATABASE_EVENT, HCatConstants.HCAT_CREATE_TABLE_EVENT, HCatConstants.HCAT_ADD_PARTITION_EVENT, + HCatConstants.HCAT_ALTER_PARTITION_EVENT, HCatConstants.HCAT_DROP_PARTITION_EVENT, + HCatConstants.HCAT_ALTER_TABLE_EVENT, HCatConstants.HCAT_DROP_TABLE_EVENT, HCatConstants.HCAT_DROP_DATABASE_EVENT); Assert.assertEquals(expectedMessages, actualMessages); @@ -120,7 +124,9 @@ public void testAMQListener() throws Exception { kvs.put("b", "2011"); client.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE); + driver.run("alter table mytbl partition (b='2011') set fileformat orc"); driver.run("alter table mytbl drop partition(b='2011')"); + driver.run("alter table mytbl add columns (c int comment 'this is an int', d decimal(3,2))"); driver.run("drop table mytbl"); driver.run("drop database mydb"); } @@ -170,6 +176,20 @@ public void onMessage(Message msg) { Assert.assertEquals("mytbl", ((AddPartitionMessage) message2).getTable()); Assert.assertEquals(1, ((AddPartitionMessage) message2).getPartitions().size()); Assert.assertEquals("2011", ((AddPartitionMessage) message2).getPartitions().get(0).get("b")); + } else if (event.equals(HCatConstants.HCAT_ALTER_PARTITION_EVENT)) { + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination().toString()); + // for alter partition events + AlterPartitionMessage message = deserializer.getAlterPartitionMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + Assert.assertEquals(1, message.getValues().size()); + Assert.assertEquals("2011", message.getValues().get(0)); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof AlterPartitionMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((AlterPartitionMessage) message2).getTable()); + Assert.assertEquals(1, ((AlterPartitionMessage) message2).getValues().size()); + Assert.assertEquals("2011", ((AlterPartitionMessage) message2).getValues().get(0)); } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() @@ -184,7 +204,8 @@ public void onMessage(Message msg) { Assert.assertEquals("mydb", message2.getDB()); Assert.assertEquals("mytbl", ((DropPartitionMessage) message2).getTable()); Assert.assertEquals(1, ((DropPartitionMessage) message2).getPartitions().size()); - Assert.assertEquals("2011", ((DropPartitionMessage) message2).getPartitions().get(0).get("b")); + Assert.assertEquals("2011", ((DropPartitionMessage) message2).getPartitions().get(0).get( + "b")); } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); @@ -199,11 +220,20 @@ public void onMessage(Message msg) { Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg .getJMSDestination().toString()); - DropDatabaseMessage message = deserializer.getDropDatabaseMessage(messageBody); + DropDatabaseMessage message = deserializer.getDropDatabaseMessage(messageBody); Assert.assertEquals("mydb", message.getDB()); HCatEventMessage message2 = MessagingUtils.getMessage(msg); Assert.assertTrue("Unexpected message-type.", message2 instanceof DropDatabaseMessage); Assert.assertEquals("mydb", message2.getDB()); + } else if (event.equals(HCatConstants.HCAT_ALTER_TABLE_EVENT)) { + Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + AlterTableMessage message = deserializer.getAlterTableMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof AlterTableMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((AlterTableMessage) message2).getTable()); } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { // TODO: Fill in when PARTITION_DONE_EVENT is supported. Assert.assertTrue("Unexpected: HCAT_PARTITION_DONE_EVENT not supported (yet).", false); diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index a847def..c10fd7f 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -138,8 +138,6 @@ public void createTable() throws Exception { "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}")); } - // TODO After HIVE-9175 is committed - /* @Test public void alterTable() throws Exception { List cols = new ArrayList(); @@ -147,12 +145,12 @@ public void alterTable() throws Exception { SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, emptyParameters); - Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd, + new ArrayList(), emptyParameters, null, null, null); msClient.createTable(table); - table = new Table("alttable", "default", "me", startTime, startTime + 1, 0, sd, null, - emptyParameters, null, null, null); + table = new Table("alttable", "default", "me", startTime, startTime + 1, 0, sd, + new ArrayList(), emptyParameters, null, null, null); msClient.alter_table("default", "alttable", table); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); @@ -168,7 +166,6 @@ public void alterTable() throws Exception { "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," + "\"timestamp\":[0-9]+}")); } - */ @Test public void dropTable() throws Exception { @@ -228,6 +225,41 @@ public void addPartition() throws Exception { } @Test + public void alterPartition() throws Exception { + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + List partCols = new ArrayList(); + partCols.add(new FieldSchema("ds", "string", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table table = new Table("alterparttable", "default", "me", startTime, startTime, 0, sd, + partCols, emptyParameters, null, null, null); + msClient.createTable(table); + + Partition partition = new Partition(Arrays.asList("today"), "default", "alterparttable", + startTime, startTime, sd, emptyParameters); + msClient.add_partition(partition); + + Partition newPart = new Partition(Arrays.asList("today"), "default", "alterparttable", + startTime, startTime + 1, sd, emptyParameters); + msClient.alter_partition("default", "alterparttable", newPart); + + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); + + NotificationEvent event = rsp.getEvents().get(2); + assertEquals(firstEventId + 3, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); + assertEquals("default", event.getDbName()); + assertEquals("alterparttable", event.getTableName()); + assertTrue(event.getMessage().matches( "\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," + + "\"timestamp\":[0-9]+,\"values\":\\[\"today\"]}")); + } + + @Test public void dropPartition() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 271b4ad..8c16a93 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3123,9 +3123,14 @@ private void rename_partition(final String db_name, final String tbl_name, oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part); + // Only fetch the table if we actually have a listener + Table table = null; for (MetaStoreEventListener listener : listeners) { + if (table == null) { + table = getMS().getTable(db_name, tbl_name); + } AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldPart, new_part, true, this); + new AlterPartitionEvent(oldPart, new_part, table, true, this); alterPartitionEvent.setEnvironmentContext(envContext); listener.onAlterPartition(alterPartitionEvent); } @@ -3176,6 +3181,8 @@ public void alter_partitions(final String db_name, final String tbl_name, oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts); Iterator olditr = oldParts.iterator(); + // Only fetch the table if we have a listener that needs it. + Table table = null; for (Partition tmpPart : new_parts) { Partition oldTmpPart = null; if (olditr.hasNext()) { @@ -3185,8 +3192,11 @@ public void alter_partitions(final String db_name, final String tbl_name, throw new InvalidOperationException("failed to alterpartitions"); } for (MetaStoreEventListener listener : listeners) { + if (table == null) { + table = getMS().getTable(db_name, tbl_name); + } AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldTmpPart, tmpPart, true, this); + new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this); listener.onAlterPartition(alterPartitionEvent); } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java index fd0c4af..8edb50b 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java @@ -20,17 +20,20 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; public class AlterPartitionEvent extends ListenerEvent { private final Partition oldPart; private final Partition newPart; + private final Table table; - public AlterPartitionEvent(Partition oldPart, Partition newPart, + public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, boolean status, HMSHandler handler) { super(status, handler); this.oldPart = oldPart; this.newPart = newPart; + this.table = table; } /** @@ -47,4 +50,12 @@ public Partition getOldPartition() { public Partition getNewPartition() { return newPart; } + + /** + * Get the table this partition is in + * @return + */ + public Table getTable() { + return table; + } }