diff --git hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java index 4dee79b..65e8bc7 100644 --- hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java +++ hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hive.hcatalog.common.HCatException; @@ -467,6 +468,31 @@ public abstract void cancelDelegationToken(String tokenStrForm) public abstract String getMessageBusTopicName(String dbName, String tableName) throws HCatException; /** + * Get a list of notifications + * @param lastEventId The last event id that was consumed by this reader. The returned + * notifications will start at the next eventId available this eventId that + * matches the filter. + * @param maxEvents Maximum number of events to return. If < 1, then all available events will + * be returned. + * @param filter Filter to determine if message should be accepted. If null, then all + * available events up to maxEvents will be returned. + * @return list of notifications, sorted by eventId. It is guaranteed that the events are in + * the order that the operations were done on the database. + * @throws HCatException + */ + public abstract List getNextNotification(long lastEventId, + int maxEvents, + IMetaStoreClient.NotificationFilter filter) + throws HCatException; + + /** + * Get the most recently used notification id. + * @return + * @throws HCatException + */ + public abstract long getCurrentNotificationEventId() throws HCatException; + + /** * Close the hcatalog client. * * @throws HCatException diff --git hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java index 7e81113..cd05254 100644 --- hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java +++ hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java @@ -23,20 +23,25 @@ import java.util.List; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -825,7 +830,8 @@ public int addPartitionSpec(HCatPartitionSpec partitionSpec) throws HCatExceptio @Override public String getMessageBusTopicName(String dbName, String tableName) throws HCatException { try { - return hmsClient.getTable(dbName, tableName).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); + return hmsClient.getTable(dbName, tableName).getParameters().get( + HCatConstants.HCAT_MSGBUS_TOPIC_NAME); } catch (MetaException e) { throw new HCatException("MetaException while retrieving JMS Topic name.", e); @@ -836,7 +842,36 @@ public String getMessageBusTopicName(String dbName, String tableName) throws HCa "TException while retrieving JMS Topic name.", e); } } - + + @Override + public List getNextNotification(long lastEventId, int maxEvents, + IMetaStoreClient.NotificationFilter filter) + throws HCatException { + try { + List events = new ArrayList(); + NotificationEventResponse rsp = hmsClient.getNextNotification(lastEventId, maxEvents, filter); + if (rsp != null && rsp.getEvents() != null) { + for (NotificationEvent event : rsp.getEvents()) { + events.add(new HCatNotificationEvent(event)); + } + } + return events; + } catch (TException e) { + throw new ConnectionFailureException("TException while getting notifications", e); + } + } + + @Override + public long getCurrentNotificationEventId() throws HCatException { + try { + CurrentNotificationEventId id = hmsClient.getCurrentNotificationEventId(); + return id.getEventId(); + } catch (TException e) { + throw new ConnectionFailureException("TException while getting current notification event " + + "id " , e); + } + } + @Override public String serializeTable(HCatTable hcatTable) throws HCatException { return MetadataSerializer.get().serializeTable(hcatTable); @@ -905,8 +940,10 @@ public HCatPartition deserializePartition(String hcatPartitionStringRep) throws @Override public HCatPartitionSpec deserializePartitionSpec(List hcatPartitionSpecStrings) throws HCatException { - HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get().deserializePartitionSpec(hcatPartitionSpecStrings); - hcatPartitionSpec.hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName())); + HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get() + .deserializePartitionSpec(hcatPartitionSpecStrings); + hcatPartitionSpec + .hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName())); return hcatPartitionSpec; } } diff --git hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java new file mode 100644 index 0000000..9205c56 --- /dev/null +++ hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +/** + * A wrapper class for {@link org.apache.hadoop.hive.metastore.api.NotificationEvent}, + * so that if that class changes we can still keep this one constant for backward compatibility + */ +public class HCatNotificationEvent { + private long eventId; + private int eventTime; + private String eventType; + private String dbName; + private String tableName; + private String message; + + HCatNotificationEvent(NotificationEvent event) { + eventId = event.getEventId(); + eventTime = event.getEventTime(); + eventType = event.getEventType(); + dbName = event.getDbName(); + tableName = event.getTableName(); + message = event.getMessage(); + } + + public long getEventId() { + return eventId; + } + + public int getEventTime() { + return eventTime; + } + + public String getEventType() { + return eventType; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public String getMessage() { + return message; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("eventId:"); + buf.append(eventId); + buf.append(" eventTime:"); + buf.append(eventTime); + buf.append(" eventType:<"); + buf.append(eventType); + buf.append("> dbName:<"); + buf.append(dbName); + buf.append("> tableName:<"); + buf.append(tableName); + buf.append("> message:<"); + buf.append(message); + buf.append(">"); + return buf.toString(); + } +} diff --git itests/hcatalog-unit/pom.xml itests/hcatalog-unit/pom.xml index eac2ae4..519639a 100644 --- itests/hcatalog-unit/pom.xml +++ itests/hcatalog-unit/pom.xml @@ -66,6 +66,12 @@ test + org.apache.hive.hcatalog + hive-webhcat-java-client + ${project.version} + test + + org.apache.hive hive-hbase-handler ${project.version} diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java new file mode 100644 index 0000000..d730eca --- /dev/null +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.listener.DbNotificationListener; +import org.apache.hive.hcatalog.messaging.HCatEventMessage; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This can't use TestHCatClient because it has to have control over certain conf variables when + * the metastore is started. Plus, we don't need a metastore running in another thread. The + * local one is fine. + */ +public class TestHCatClientNotification { + + private static final Log LOG = LogFactory.getLog(TestHCatClientNotification.class.getName()); + private static HCatClient hCatClient; + private int startTime; + private long firstEventId; + + @BeforeClass + public static void setupClient() throws Exception { + HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, + DbNotificationListener.class.getName()); + hCatClient = HCatClient.create(conf); + } + + @Before + public void setup() throws Exception { + long now = System.currentTimeMillis() / 1000; + startTime = 0; + if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge"); + else startTime = (int)now; + firstEventId = hCatClient.getCurrentNotificationEventId(); + } + + @Test + public void createDatabase() throws Exception { + hCatClient.createDatabase(HCatCreateDBDesc.create("myhcatdb").build()); + List events = hCatClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, events.size()); + + HCatNotificationEvent event = events.get(0); + assertEquals(firstEventId + 1, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType()); + assertNull(event.getDbName()); + assertNull(event.getTableName()); + assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"myhcatdb\",\"timestamp\":[0-9]+}")); + } + + @Test + public void dropDatabase() throws Exception { + String dbname = "hcatdropdb"; + hCatClient.createDatabase(HCatCreateDBDesc.create(dbname).build()); + hCatClient.dropDatabase(dbname, false, HCatClient.DropDBMode.RESTRICT); + + List events = hCatClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, events.size()); + + HCatNotificationEvent event = events.get(1); + assertEquals(firstEventId + 2, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType()); + assertEquals(dbname, event.getDbName()); + assertNull(event.getTableName()); + assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"hcatdropdb\",\"timestamp\":[0-9]+}")); + } + + @Test + public void createTable() throws Exception { + String dbName = "default"; + String tableName = "hcatcreatetable"; + HCatTable table = new HCatTable(dbName, tableName); + table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, ""))); + hCatClient.createTable(HCatCreateTableDesc.create(table).build()); + + List events = hCatClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, events.size()); + + HCatNotificationEvent event = events.get(0); + assertEquals(firstEventId + 1, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType()); + assertEquals(dbName, event.getDbName()); + assertNull(event.getTableName()); + assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"hcatcreatetable\",\"timestamp\":[0-9]+}")); + } + + // TODO - Currently no way to test alter table, as this interface doesn't support alter table + + @Test + public void dropTable() throws Exception { + String dbName = "default"; + String tableName = "hcatdroptable"; + HCatTable table = new HCatTable(dbName, tableName); + table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, ""))); + hCatClient.createTable(HCatCreateTableDesc.create(table).build()); + hCatClient.dropTable(dbName, tableName, false); + + List events = hCatClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, events.size()); + + HCatNotificationEvent event = events.get(1); + assertEquals(firstEventId + 2, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType()); + assertEquals(dbName, event.getDbName()); + assertEquals(tableName, event.getTableName()); + assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"hcatdroptable\",\"timestamp\":[0-9]+}")); + } + + @Test + public void addPartition() throws Exception { + String dbName = "default"; + String tableName = "hcataddparttable"; + String partColName = "pc"; + HCatTable table = new HCatTable(dbName, tableName); + table.partCol(new HCatFieldSchema(partColName, TypeInfoFactory.stringTypeInfo, "")); + table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, ""))); + hCatClient.createTable(HCatCreateTableDesc.create(table).build()); + String partName = "testpart"; + Map partSpec = new HashMap(1); + partSpec.put(partColName, partName); + hCatClient.addPartition( + HCatAddPartitionDesc.create( + new HCatPartition(table, partSpec, null) + ).build() + ); + + List events = hCatClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, events.size()); + + HCatNotificationEvent event = events.get(1); + assertEquals(firstEventId + 2, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); + assertEquals("default", event.getDbName()); + assertEquals(tableName, event.getTableName()); + assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"hcataddparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}")); + } + + // TODO - currently no way to test alter partition, as HCatClient doesn't support it. + @Test + public void dropPartition() throws Exception { + String dbName = "default"; + String tableName = "hcatdropparttable"; + String partColName = "pc"; + HCatTable table = new HCatTable(dbName, tableName); + table.partCol(new HCatFieldSchema(partColName, TypeInfoFactory.stringTypeInfo, "")); + table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, ""))); + hCatClient.createTable(HCatCreateTableDesc.create(table).build()); + String partName = "testpart"; + Map partSpec = new HashMap(1); + partSpec.put(partColName, partName); + hCatClient.addPartition( + HCatAddPartitionDesc.create( + new HCatPartition(table, partSpec, null) + ).build() + ); + hCatClient.dropPartitions(dbName, tableName, partSpec, false); + + List events = hCatClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, events.size()); + + HCatNotificationEvent event = events.get(2); + assertEquals(firstEventId + 3, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType()); + assertEquals("default", event.getDbName()); + assertEquals(tableName, event.getTableName()); + assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"hcatdropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}")); + } + + @Test + public void getOnlyMaxEvents() throws Exception { + hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb1").build()); + hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb2").build()); + hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb3").build()); + + List events = hCatClient.getNextNotification(firstEventId, 2, null); + assertEquals(2, events.size()); + assertEquals(firstEventId + 1, events.get(0).getEventId()); + assertEquals(firstEventId + 2, events.get(1).getEventId()); + } + + @Test + public void filter() throws Exception { + hCatClient.createDatabase(HCatCreateDBDesc.create("hcatf1").build()); + hCatClient.createDatabase(HCatCreateDBDesc.create("hcatf2").build()); + hCatClient.dropDatabase("hcatf2", false, HCatClient.DropDBMode.RESTRICT); + + IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent event) { + return event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT); + } + }; + List events = hCatClient.getNextNotification(firstEventId, 0, filter); + assertEquals(1, events.size()); + assertEquals(firstEventId + 3, events.get(0).getEventId()); + } + + @Test + public void filterWithMax() throws Exception { + hCatClient.createDatabase(HCatCreateDBDesc.create("hcatm1").build()); + hCatClient.createDatabase(HCatCreateDBDesc.create("hcatm2").build()); + hCatClient.dropDatabase("hcatm2", false, HCatClient.DropDBMode.RESTRICT); + + IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent event) { + return event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT); + } + }; + List events = hCatClient.getNextNotification(firstEventId, 1, filter); + assertEquals(1, events.size()); + assertEquals(firstEventId + 1, events.get(0).getEventId()); + } +}