diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9e4c590..9a5d604 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -250,6 +250,7 @@ private static URL checkConfigFile(File f) { HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, HiveConf.ConfVars.METASTORE_EVENT_CLEAN_FREQ, HiveConf.ConfVars.METASTORE_EVENT_EXPIRY_DURATION, + HiveConf.ConfVars.METASTORE_EVENT_MESSAGE_FACTORY, HiveConf.ConfVars.METASTORE_FILTER_HOOK, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS, @@ -794,6 +795,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration", "0s", new TimeValidator(TimeUnit.SECONDS), "Duration after which events expire from events table"), + METASTORE_EVENT_MESSAGE_FACTORY("hive.metastore.event.message.factory", + "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory", + "Factory class for making encoding and decoding messages in the events generated."), METASTORE_EXECUTE_SET_UGI("hive.metastore.execute.setugi", true, "In unsecure mode, setting this property to true will cause the metastore to execute DFS operations using \n" + "the client's reported user and group permissions. Note that this property must be set on \n" + diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index ea7520d..494d01f 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -17,41 +17,42 @@ */ package org.apache.hive.hcatalog.listener; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.events.AddIndexEvent; -import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; -import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; -import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; -import org.apache.hadoop.hive.metastore.events.DropIndexEvent; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; +import org.apache.hadoop.hive.metastore.events.DropIndexEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; - /** * An implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that * stores events in the database. @@ -121,10 +122,11 @@ public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException { * @param tableEvent table event. * @throws MetaException */ - public void onCreateTable (CreateTableEvent tableEvent) throws MetaException { + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { Table t = tableEvent.getTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), msgFactory + .buildCreateTableMessage(t).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); @@ -134,10 +136,11 @@ public void onCreateTable (CreateTableEvent tableEvent) throws MetaException { * @param tableEvent table event. * @throws MetaException */ - public void onDropTable (DropTableEvent tableEvent) throws MetaException { + public void onDropTable(DropTableEvent tableEvent) throws MetaException { Table t = tableEvent.getTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory + .buildDropTableMessage(t).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); @@ -147,12 +150,12 @@ public void onDropTable (DropTableEvent tableEvent) throws MetaException { * @param tableEvent alter table event * @throws MetaException */ - public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { + public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { Table before = tableEvent.getOldTable(); Table after = tableEvent.getNewTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_ALTER_TABLE_EVENT, - msgFactory.buildAlterTableMessage(before, after).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory + .buildAlterTableMessage(before, after).toString()); event.setDbName(after.getDbName()); event.setTableName(after.getTableName()); enqueue(event); @@ -162,12 +165,12 @@ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { * @param partitionEvent partition event * @throws MetaException */ - public void onAddPartition (AddPartitionEvent partitionEvent) - throws MetaException { + public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { Table t = partitionEvent.getTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_ADD_PARTITION_EVENT, - msgFactory.buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); + String msg = msgFactory + .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); @@ -177,11 +180,11 @@ public void onAddPartition (AddPartitionEvent partitionEvent) * @param partitionEvent partition event * @throws MetaException */ - public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaException { + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { Table t = partitionEvent.getTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_DROP_PARTITION_EVENT, - msgFactory.buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory + .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); @@ -191,12 +194,12 @@ public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaExce * @param partitionEvent partition event * @throws MetaException */ - public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaException { + public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException { Partition before = partitionEvent.getOldPartition(); Partition after = partitionEvent.getNewPartition(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_ALTER_PARTITION_EVENT, - msgFactory.buildAlterPartitionMessage(partitionEvent.getTable(),before, after).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory + .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString()); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); enqueue(event); @@ -206,11 +209,11 @@ public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaEx * @param dbEvent database event * @throws MetaException */ - public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException { + public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { Database db = dbEvent.getDatabase(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_CREATE_DATABASE_EVENT, - msgFactory.buildCreateDatabaseMessage(db).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory + .buildCreateDatabaseMessage(db).toString()); event.setDbName(db.getName()); enqueue(event); } @@ -219,11 +222,11 @@ public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException * @param dbEvent database event * @throws MetaException */ - public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { Database db = dbEvent.getDatabase(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_DROP_DATABASE_EVENT, - msgFactory.buildDropDatabaseMessage(db).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory + .buildDropDatabaseMessage(db).toString()); event.setDbName(db.getName()); enqueue(event); } @@ -232,11 +235,11 @@ public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { * @param fnEvent function event * @throws MetaException */ - public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException { + public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException { Function fn = fnEvent.getFunction(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_CREATE_FUNCTION_EVENT, - msgFactory.buildCreateFunctionMessage(fn).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory + .buildCreateFunctionMessage(fn).toString()); event.setDbName(fn.getDbName()); enqueue(event); } @@ -245,11 +248,11 @@ public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException * @param fnEvent function event * @throws MetaException */ - public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException { + public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException { Function fn = fnEvent.getFunction(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_DROP_FUNCTION_EVENT, - msgFactory.buildDropFunctionMessage(fn).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory + .buildDropFunctionMessage(fn).toString()); event.setDbName(fn.getDbName()); enqueue(event); } @@ -258,11 +261,11 @@ public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException { * @param indexEvent index event * @throws MetaException */ - public void onAddIndex (AddIndexEvent indexEvent) throws MetaException { + public void onAddIndex(AddIndexEvent indexEvent) throws MetaException { Index index = indexEvent.getIndex(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_CREATE_INDEX_EVENT, - msgFactory.buildCreateIndexMessage(index).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory + .buildCreateIndexMessage(index).toString()); event.setDbName(index.getDbName()); enqueue(event); } @@ -271,11 +274,11 @@ public void onAddIndex (AddIndexEvent indexEvent) throws MetaException { * @param indexEvent index event * @throws MetaException */ - public void onDropIndex (DropIndexEvent indexEvent) throws MetaException { + public void onDropIndex(DropIndexEvent indexEvent) throws MetaException { Index index = indexEvent.getIndex(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_DROP_INDEX_EVENT, - msgFactory.buildDropIndexMessage(index).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory + .buildDropIndexMessage(index).toString()); event.setDbName(index.getDbName()); enqueue(event); } @@ -284,21 +287,22 @@ public void onDropIndex (DropIndexEvent indexEvent) throws MetaException { * @param indexEvent index event * @throws MetaException */ - public void onAlterIndex (AlterIndexEvent indexEvent) throws MetaException { + public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException { Index before = indexEvent.getOldIndex(); Index after = indexEvent.getNewIndex(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_ALTER_INDEX_EVENT, - msgFactory.buildAlterIndexMessage(before, after).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory + .buildAlterIndexMessage(before, after).toString()); event.setDbName(before.getDbName()); enqueue(event); } @Override public void onInsert(InsertEvent insertEvent) throws MetaException { - NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_INSERT_EVENT, - msgFactory.buildInsertMessage(insertEvent.getDb(), insertEvent.getTable(), - insertEvent.getPartitionKeyValues(), insertEvent.getFiles()).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage( + insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), + insertEvent.getFiles()).toString()); event.setDbName(insertEvent.getDb()); event.setTableName(insertEvent.getTable()); enqueue(event); diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java index a661962..775c1d6 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java @@ -18,28 +18,31 @@ */ package org.apache.hive.hcatalog.api; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import junit.framework.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.listener.DbNotificationListener; -import org.apache.hive.hcatalog.messaging.HCatEventMessage; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ObjectNode; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -125,8 +128,14 @@ public void createTable() throws Exception { assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType()); assertEquals(dbName, event.getDbName()); assertEquals("hcatcreatetable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"hcatcreatetable\",\"timestamp\":[0-9]+}")); + + // Parse the message field + ObjectNode jsonTree = getJsonTree(event); + assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("hcatcreatetable", jsonTree.get("table").asText()); + Table tableObj = JSONMessageFactory.getTableObj(jsonTree); + assertEquals(table.toHiveTable(), tableObj); } // TODO - Currently no way to test alter table, as this interface doesn't support alter table @@ -166,11 +175,8 @@ public void addPartition() throws Exception { String partName = "testpart"; Map partSpec = new HashMap(1); partSpec.put(partColName, partName); - hCatClient.addPartition( - HCatAddPartitionDesc.create( - new HCatPartition(table, partSpec, null) - ).build() - ); + HCatPartition part = new HCatPartition(table, partSpec, null); + hCatClient.addPartition(HCatAddPartitionDesc.create(part).build()); List events = hCatClient.getNextNotification(firstEventId, 0, null); assertEquals(2, events.size()); @@ -181,9 +187,20 @@ public void addPartition() throws Exception { assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals(tableName, event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"hcataddparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}")); + + // Parse the message field + ObjectNode jsonTree = getJsonTree(event); + assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("hcataddparttable", jsonTree.get("table").asText()); + List partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree); + HCatPartition hcatPart = new HCatPartition(table, partitionObjList.get(0)); + assertEquals(part.getDatabaseName(), hcatPart.getDatabaseName()); + assertEquals(part.getTableName(), hcatPart.getTableName()); + assertEquals(part.getValues(), hcatPart.getValues()); + assertEquals(part.getColumns(), hcatPart.getColumns()); + assertEquals(part.getPartColumns(), hcatPart.getPartColumns()); + assertEquals(part.getLocation(), hcatPart.getLocation()); } // TODO - currently no way to test alter partition, as HCatClient doesn't support it. @@ -265,4 +282,10 @@ public boolean accept(NotificationEvent event) { assertEquals(1, events.size()); assertEquals(firstEventId + 1, events.get(0).getEventId()); } + + private ObjectNode getJsonTree(HCatNotificationEvent event) throws Exception { + JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonParser, ObjectNode.class); + } } diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 4f97cf4..690616d 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -18,25 +18,19 @@ */ package org.apache.hive.hcatalog.listener; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.FunctionType; -import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.ResourceType; -import org.apache.hadoop.hive.metastore.api.ResourceUri; -import org.apache.htrace.fasterxml.jackson.core.JsonFactory; -import org.apache.htrace.fasterxml.jackson.core.JsonParser; -import org.apache.htrace.fasterxml.jackson.databind.JsonNode; -import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; -import org.apache.htrace.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.protocol.TJSONProtocol; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -45,29 +39,37 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventRequestData; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.ResourceType; +import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ObjectNode; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class TestDbNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName()); private static final int EVENTS_TTL = 30; @@ -183,26 +185,34 @@ public void createTable() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, emptyParameters, + null, null, null); msClient.createTable(table); - + // Get the event NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(0); assertEquals(firstEventId + 1, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("mytable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}")); - table = new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + // Parse the message field + ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); + assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("mytable", jsonTree.get("table").asText()); + Table tableObj = JSONMessageFactory.getTableObj(jsonTree); + assertEquals(table, tableObj); + + table = + new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, emptyParameters, + null, null, null); DummyRawStoreFailEvent.setEventSucceed(false); try { msClient.createTable(table); @@ -223,11 +233,12 @@ public void alterTable() throws Exception { serde, null, null, emptyParameters); Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd, new ArrayList(), emptyParameters, null, null, null); + // Event 1 msClient.createTable(table); - cols.add(new FieldSchema("col2", "int", "")); table = new Table("alttable", "default", "me", startTime, startTime, 0, sd, new ArrayList(), emptyParameters, null, null, null); + // Event 2 msClient.alter_table("default", "alttable", table); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); @@ -239,9 +250,14 @@ public void alterTable() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alttable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," + - "\"timestamp\":[0-9]+}")); + + // Parse the message field + ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); + assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("alttable", jsonTree.get("table").asText()); + Table tableObj = JSONMessageFactory.getTableObj(jsonTree); + assertEquals(table, tableObj); DummyRawStoreFailEvent.setEventSucceed(false); try { @@ -319,9 +335,14 @@ public void addPartition() throws Exception { assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("addparttable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"addparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}")); + + // Parse the message field + ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); + assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("addparttable", jsonTree.get("table").asText()); + List partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree); + assertEquals(partition, partitionObjList.get(0)); partition = new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist", startTime, startTime, sd, emptyParameters); @@ -365,10 +386,14 @@ public void alterPartition() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alterparttable", event.getTableName()); - assertTrue(event.getMessage(), - event.getMessage().matches("\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," + - "\"timestamp\":[0-9]+,\"keyValues\":\\{\"ds\":\"today\"}}")); + + // Parse the message field + ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); + assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("alterparttable", jsonTree.get("table").asText()); + List partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree); + assertEquals(newPart, partitionObjList.get(0)); DummyRawStoreFailEvent.setEventSucceed(false); try { @@ -445,7 +470,7 @@ public void createFunction() throws Exception { assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_CREATE_FUNCTION_EVENT, event.getEventType()); assertEquals(dbName, event.getDbName()); - Function funcObj = getFunctionObj(getJsonTree(event)); + Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event)); assertEquals(dbName, funcObj.getDbName()); assertEquals(funcName, funcObj.getFunctionName()); assertEquals(funcClass, funcObj.getClassName()); @@ -488,7 +513,7 @@ public void dropFunction() throws Exception { assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_DROP_FUNCTION_EVENT, event.getEventType()); assertEquals(dbName, event.getDbName()); - Function funcObj = getFunctionObj(getJsonTree(event)); + Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event)); assertEquals(dbName, funcObj.getDbName()); assertEquals(funcName, funcObj.getFunctionName()); assertEquals(funcClass, funcObj.getClassName()); @@ -542,7 +567,7 @@ public void createIndex() throws Exception { assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_CREATE_INDEX_EVENT, event.getEventType()); assertEquals(dbName, event.getDbName()); - Index indexObj = getIndexObj(getJsonTree(event)); + Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event)); assertEquals(dbName, indexObj.getDbName()); assertEquals(indexName, indexObj.getIndexName()); assertEquals(tableName, indexObj.getOrigTableName()); @@ -593,7 +618,7 @@ public void dropIndex() throws Exception { assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_DROP_INDEX_EVENT, event.getEventType()); assertEquals(dbName, event.getDbName()); - Index indexObj = getIndexObj(getJsonTree(event)); + Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event)); assertEquals(dbName, indexObj.getDbName()); assertEquals(indexName.toLowerCase(), indexObj.getIndexName()); assertEquals(tableName.toLowerCase(), indexObj.getOrigTableName()); @@ -647,7 +672,7 @@ public void alterIndex() throws Exception { assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_ALTER_INDEX_EVENT, event.getEventType()); assertEquals(dbName, event.getDbName()); - Index indexObj = getIndexObj(getJsonTree(event), "afterIndexObjJson"); + Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event), "afterIndexObjJson"); assertEquals(dbName, indexObj.getDbName()); assertEquals(indexName, indexObj.getIndexName()); assertEquals(tableName, indexObj.getOrigTableName()); @@ -971,30 +996,4 @@ public void cleanupNotifs() throws Exception { LOG.info("second trigger done"); assertEquals(0, rsp2.getEventsSize()); } - - private ObjectNode getJsonTree(NotificationEvent event) throws Exception { - JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(jsonParser, ObjectNode.class); - } - - private Function getFunctionObj(JsonNode jsonTree) throws Exception { - TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); - Function funcObj = new Function(); - String tableJson = jsonTree.get("functionObjJson").asText(); - deSerializer.deserialize(funcObj, tableJson, "UTF-8"); - return funcObj; - } - - private Index getIndexObj(JsonNode jsonTree) throws Exception { - return getIndexObj(jsonTree, "indexObjJson"); - } - - private Index getIndexObj(JsonNode jsonTree, String indexObjKey) throws Exception { - TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); - Index indexObj = new Index(); - String tableJson = jsonTree.get(indexObjKey).asText(); - deSerializer.deserialize(indexObj, tableJson, "UTF-8"); - return indexObj; - } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java new file mode 100644 index 0000000..26898f2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; +import java.util.Map; + +public abstract class AddPartitionMessage extends EventMessage { + + protected AddPartitionMessage() { + super(EventType.ADD_PARTITION); + } + + /** + * Getter for name of table (where partitions are added). + * @return Table-name (String). + */ + public abstract String getTable(); + + /** + * Getter for list of partitions added. + * @return List of maps, where each map identifies values for each partition-key, for every added partition. + */ + public abstract List> getPartitions (); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java new file mode 100644 index 0000000..0fc7f9e --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class AlterIndexMessage extends EventMessage { + + protected AlterIndexMessage() { + super(EventType.ALTER_INDEX); + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java new file mode 100644 index 0000000..d89dba1 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.Map; + +public abstract class AlterPartitionMessage extends EventMessage { + + protected AlterPartitionMessage() { + super(EventType.ALTER_PARTITION); + } + + public abstract String getTable(); + + public abstract Map getKeyValues(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) throw new IllegalStateException("Table name unset."); + if (getKeyValues() == null) throw new IllegalStateException("Partition values unset"); + return super.checkValid(); + } +} + diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java new file mode 100644 index 0000000..99e678a --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class AlterTableMessage extends EventMessage { + + protected AlterTableMessage() { + super(EventType.ALTER_TABLE); + } + + public abstract String getTable(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java new file mode 100644 index 0000000..7614298 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class CreateDatabaseMessage extends EventMessage { + + protected CreateDatabaseMessage() { + super(EventType.CREATE_DATABASE); + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java new file mode 100644 index 0000000..867e8ec --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class CreateFunctionMessage extends EventMessage { + + protected CreateFunctionMessage() { + super(EventType.CREATE_FUNCTION); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java new file mode 100644 index 0000000..81676aa --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class CreateIndexMessage extends EventMessage { + + protected CreateIndexMessage() { + super(EventType.CREATE_INDEX); + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java new file mode 100644 index 0000000..c88c59c --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class CreateTableMessage extends EventMessage { + + protected CreateTableMessage() { + super(EventType.CREATE_TABLE); + } + + /** + * Getter for the name of table created in HCatalog. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java new file mode 100644 index 0000000..fa6da38 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class DropDatabaseMessage extends EventMessage { + + protected DropDatabaseMessage() { + super(EventType.DROP_DATABASE); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java new file mode 100644 index 0000000..82cdc44 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class DropFunctionMessage extends EventMessage { + + protected DropFunctionMessage() { + super(EventType.DROP_FUNCTION); + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java new file mode 100644 index 0000000..ce7b760 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class DropIndexMessage extends EventMessage { + + protected DropIndexMessage() { + super(EventType.DROP_INDEX); + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java new file mode 100644 index 0000000..26aecb3 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; +import java.util.Map; + +public abstract class DropPartitionMessage extends EventMessage { + + protected DropPartitionMessage() { + super(EventType.DROP_PARTITION); + } + + public abstract String getTable(); + public abstract List> getPartitions (); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java new file mode 100644 index 0000000..64a8cc5 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class DropTableMessage extends EventMessage { + + protected DropTableMessage() { + super(EventType.DROP_TABLE); + } + + /** + * Getter for the name of the table being dropped. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java new file mode 100644 index 0000000..1ec0de0 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +/** + * Class representing messages emitted when Metastore operations are done. + * (E.g. Creation and deletion of databases, tables and partitions.) + */ +public abstract class EventMessage { + + /** + * Enumeration of all supported types of Metastore operations. + */ + public static enum EventType { + + CREATE_DATABASE(MessageFactory.CREATE_DATABASE_EVENT), + DROP_DATABASE(MessageFactory.DROP_DATABASE_EVENT), + CREATE_TABLE(MessageFactory.CREATE_TABLE_EVENT), + DROP_TABLE(MessageFactory.DROP_TABLE_EVENT), + ADD_PARTITION(MessageFactory.ADD_PARTITION_EVENT), + DROP_PARTITION(MessageFactory.DROP_PARTITION_EVENT), + ALTER_TABLE(MessageFactory.ALTER_TABLE_EVENT), + ALTER_PARTITION(MessageFactory.ALTER_PARTITION_EVENT), + INSERT(MessageFactory.INSERT_EVENT), + CREATE_FUNCTION(MessageFactory.CREATE_FUNCTION_EVENT), + DROP_FUNCTION(MessageFactory.DROP_FUNCTION_EVENT), + CREATE_INDEX(MessageFactory.CREATE_INDEX_EVENT), + DROP_INDEX(MessageFactory.DROP_INDEX_EVENT), + ALTER_INDEX(MessageFactory.ALTER_INDEX_EVENT); + + private String typeString; + + EventType(String typeString) { + this.typeString = typeString; + } + + @Override + public String toString() { return typeString; } + } + + protected EventType eventType; + + protected EventMessage(EventType eventType) { + this.eventType = eventType; + } + + public EventType getEventType() { + return eventType; + } + + /** + * Getter for HCatalog Server's URL. + * (This is where the event originates from.) + * @return HCatalog Server's URL (String). + */ + public abstract String getServer(); + + /** + * Getter for the Kerberos principal of the HCatalog service. + * @return HCatalog Service Principal (String). + */ + public abstract String getServicePrincipal(); + + /** + * Getter for the name of the Database on which the Metastore operation is done. + * @return Database-name (String). + */ + public abstract String getDB(); + + /** + * Getter for the timestamp associated with the operation. + * @return Timestamp (Long - seconds since epoch). + */ + public abstract Long getTimestamp(); + + /** + * Class invariant. Checked after construction or deserialization. + */ + public EventMessage checkValid() { + if (getServer() == null || getServicePrincipal() == null) + throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null."); + if (getEventType() == null) + throw new IllegalStateException("Event-type unset."); + if (getDB() == null) + throw new IllegalArgumentException("DB-name unset."); + + return this; + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java new file mode 100644 index 0000000..932af7e --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class EventUtils { + + /** + * Utility function that constructs a notification filter to match a given db name and/or table name. + * If dbName == null, fetches all warehouse events. + * If dnName != null, but tableName == null, fetches all events for the db + * If dbName != null && tableName != null, fetches all events for the specified table + * @param dbName + * @param tableName + * @return + */ + public static IMetaStoreClient.NotificationFilter getDbTblNotificationFilter(final String dbName, final String tableName){ + return new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent event) { + if (event == null){ + return false; // get rid of trivial case first, so that we can safely assume non-null + } + if (dbName == null){ + return true; // if our dbName is null, we're interested in all wh events + } + if (dbName.equalsIgnoreCase(event.getDbName())){ + if ( (tableName == null) + // if our dbName is equal, but tableName is blank, we're interested in this db-level event + || (tableName.equalsIgnoreCase(event.getTableName())) + // table level event that matches us + ){ + return true; + } + } + return false; + } + }; + } + + + public interface NotificationFetcher { + public int getBatchSize() throws IOException; + public long getCurrentNotificationEventId() throws IOException; + public List getNextNotificationEvents( + long pos, IMetaStoreClient.NotificationFilter filter) throws IOException; + } + + // MetaStoreClient-based impl of NotificationFetcher + public static class MSClientNotificationFetcher implements NotificationFetcher{ + + private IMetaStoreClient msc = null; + private Integer batchSize = null; + + public MSClientNotificationFetcher(IMetaStoreClient msc){ + this.msc = msc; + } + + @Override + public int getBatchSize() throws IOException { + if (batchSize == null){ + try { + batchSize = Integer.parseInt( + msc.getConfigValue(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname, "50")); + // TODO: we're asking the metastore what its configuration for this var is - we may + // want to revisit to pull from client side instead. The reason I have it this way + // is because the metastore is more likely to have a reasonable config for this than + // an arbitrary client. + } catch (TException e) { + throw new IOException(e); + } + } + return batchSize; + } + + @Override + public long getCurrentNotificationEventId() throws IOException { + try { + return msc.getCurrentNotificationEventId().getEventId(); + } catch (TException e) { + throw new IOException(e); + } + } + + @Override + public List getNextNotificationEvents( + long pos, IMetaStoreClient.NotificationFilter filter) throws IOException { + try { + return msc.getNextNotification(pos,getBatchSize(), filter).getEvents(); + } catch (TException e) { + throw new IOException(e); + } + } + } + + public static class NotificationEventIterator implements Iterator { + + private NotificationFetcher nfetcher; + private IMetaStoreClient.NotificationFilter filter; + private int maxEvents; + + private Iterator batchIter = null; + private List batch = null; + private long pos; + private long maxPos; + private int eventCount; + + public NotificationEventIterator( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + String dbName, String tableName) throws IOException { + init(nfetcher, eventFrom, maxEvents, EventUtils.getDbTblNotificationFilter(dbName, tableName)); + // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter + // is an operation that needs to run before delegating to the other ctor, and this messes up chaining + // ctors + } + + public NotificationEventIterator( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws IOException { + init(nfetcher,eventFrom,maxEvents,filter); + } + + private void init( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws IOException { + this.nfetcher = nfetcher; + this.filter = filter; + this.pos = eventFrom; + if (maxEvents < 1){ + // 0 or -1 implies fetch everything + this.maxEvents = Integer.MAX_VALUE; + } else { + this.maxEvents = maxEvents; + } + + this.eventCount = 0; + this.maxPos = nfetcher.getCurrentNotificationEventId(); + } + + private void fetchNextBatch() throws IOException { + batch = nfetcher.getNextNotificationEvents(pos, filter); + int batchSize = nfetcher.getBatchSize(); + while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){ + // no valid events this batch, but we're still not done processing events + pos += batchSize; + batch = nfetcher.getNextNotificationEvents(pos,filter); + } + + if (batch == null){ + batch = new ArrayList(); + // instantiate empty list so that we don't error out on iterator fetching. + // If we're here, then the next check of pos will show our caller that + // that we've exhausted our event supply + } + batchIter = batch.iterator(); + } + + @Override + public boolean hasNext() { + if (eventCount >= maxEvents){ + // If we've already satisfied the number of events we were supposed to deliver, we end it. + return false; + } + if ((batchIter != null) && (batchIter.hasNext())){ + // If we have a valid batchIter and it has more elements, return them. + return true; + } + // If we're here, we want more events, and either batchIter is null, or batchIter + // has reached the end of the current batch. Let's fetch the next batch. + try { + fetchNextBatch(); + } catch (IOException e) { + // Regrettable that we have to wrap the IOException into a RuntimeException, + // but throwing the exception is the appropriate result here, and hasNext() + // signature will only allow RuntimeExceptions. Iterator.hasNext() really + // should have allowed IOExceptions + throw new RuntimeException(e); + } + // New batch has been fetched. If it's not empty, we have more elements to process. + return !batch.isEmpty(); + } + + @Override + public NotificationEvent next() { + eventCount++; + NotificationEvent ev = batchIter.next(); + pos = ev.getEventId(); + return ev; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator"); + } + + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java new file mode 100644 index 0000000..fe747df --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; +import java.util.Map; + +/** + * HCat message sent when an insert is done to a table or partition. + */ +public abstract class InsertMessage extends EventMessage { + + protected InsertMessage() { + super(EventType.INSERT); + } + + /** + * Getter for the name of the table being insert into. + * @return Table-name (String). + */ + public abstract String getTable(); + + /** + * Get the map of partition keyvalues. Will be null if this insert is to a table and not a + * partition. + * @return Map of partition keyvalues, or null. + */ + public abstract Map getPartitionKeyValues(); + + /** + * Get the list of files created as a result of this DML operation. May be null. + * @return List of new files, or null. + */ + public abstract List getFiles(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java new file mode 100644 index 0000000..515c455 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +/** + * Interface for converting HCat events from String-form back to EventMessage instances. + */ +public abstract class MessageDeserializer { + + /** + * Method to construct EventMessage from string. + */ + public EventMessage getEventMessage(String eventTypeString, String messageBody) { + + switch (EventMessage.EventType.valueOf(eventTypeString)) { + case CREATE_DATABASE: + return getCreateDatabaseMessage(messageBody); + case DROP_DATABASE: + return getDropDatabaseMessage(messageBody); + case CREATE_TABLE: + return getCreateTableMessage(messageBody); + case ALTER_TABLE: + return getAlterTableMessage(messageBody); + case DROP_TABLE: + return getDropTableMessage(messageBody); + case ADD_PARTITION: + return getAddPartitionMessage(messageBody); + case ALTER_PARTITION: + return getAlterPartitionMessage(messageBody); + case DROP_PARTITION: + return getDropPartitionMessage(messageBody); + case CREATE_FUNCTION: + return getCreateFunctionMessage(messageBody); + case DROP_FUNCTION: + return getDropFunctionMessage(messageBody); + case CREATE_INDEX: + return getCreateIndexMessage(messageBody); + case DROP_INDEX: + return getDropIndexMessage(messageBody); + case ALTER_INDEX: + return getAlterIndexMessage(messageBody); + case INSERT: + return getInsertMessage(messageBody); + + default: + throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); + } + } + + /** + * Method to de-serialize CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody); + + /** + * Method to de-serialize DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody); + + /** + * Method to de-serialize CreateTableMessage instance. + */ + public abstract CreateTableMessage getCreateTableMessage(String messageBody); + + /** + * Method to de-serialize AlterTableMessge + * @param messageBody string message + * @return object message + */ + public abstract AlterTableMessage getAlterTableMessage(String messageBody); + + /** + * Method to de-serialize DropTableMessage instance. + */ + public abstract DropTableMessage getDropTableMessage(String messageBody); + + /** + * Method to de-serialize AddPartitionMessage instance. + */ + public abstract AddPartitionMessage getAddPartitionMessage(String messageBody); + + /** + * Method to deserialize AlterPartitionMessage + * @param messageBody the message in serialized form + * @return message in object form + */ + public abstract AlterPartitionMessage getAlterPartitionMessage(String messageBody); + + /** + * Method to de-serialize DropPartitionMessage instance. + */ + public abstract DropPartitionMessage getDropPartitionMessage(String messageBody); + + /** + * Method to de-serialize CreateFunctionMessage instance. + */ + public abstract CreateFunctionMessage getCreateFunctionMessage(String messageBody); + + /** + * Method to de-serialize DropFunctionMessage instance. + */ + public abstract DropFunctionMessage getDropFunctionMessage(String messageBody); + + /** + * Method to de-serialize CreateIndexMessage instance. + */ + public abstract CreateIndexMessage getCreateIndexMessage(String messageBody); + + /** + * Method to de-serialize DropIndexMessage instance. + */ + public abstract DropIndexMessage getDropIndexMessage(String messageBody); + + /** + * Method to de-serialize AlterIndexMessage instance. + */ + public abstract AlterIndexMessage getAlterIndexMessage(String messageBody); + + /** + * Method to deserialize InsertMessage + * @param messageBody the message in serialized form + * @return message in object form + */ + public abstract InsertMessage getInsertMessage(String messageBody); + + // Protection against construction. + protected MessageDeserializer() {} +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java new file mode 100644 index 0000000..adf2fd8 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.ReflectionUtils; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Abstract Factory for the construction of HCatalog message instances. + */ +public abstract class MessageFactory { + + // Common name constants for event messages + public static final String ADD_PARTITION_EVENT = "ADD_PARTITION"; + public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION"; + public static final String DROP_PARTITION_EVENT = "DROP_PARTITION"; + public static final String CREATE_TABLE_EVENT = "CREATE_TABLE"; + public static final String ALTER_TABLE_EVENT = "ALTER_TABLE"; + public static final String DROP_TABLE_EVENT = "DROP_TABLE"; + public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE"; + public static final String DROP_DATABASE_EVENT = "DROP_DATABASE"; + public static final String INSERT_EVENT = "INSERT"; + public static final String CREATE_FUNCTION_EVENT = "CREATE_FUNCTION"; + public static final String DROP_FUNCTION_EVENT = "DROP_FUNCTION"; + public static final String CREATE_INDEX_EVENT = "CREATE_INDEX"; + public static final String DROP_INDEX_EVENT = "DROP_INDEX"; + public static final String ALTER_INDEX_EVENT = "ALTER_INDEX"; + + + private static MessageFactory instance = null; + + protected static final HiveConf hiveConf = new HiveConf(); + static { + hiveConf.addResource("hive-site.xml"); + } + + // This parameter is retained for legacy reasons, in case someone implemented custom + // factories. This, however, should not be the case, since this api was intended to + // be internal-only, and we should manage the jms and json implementations without + // needing this parameter. Marking as deprecated, for removal by 2.4 - see corresponding + // note on the getDeserializer(String,String) method + @Deprecated + private static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl."; + + protected static final String MS_SERVER_URL = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.name(), ""); + protected static final String MS_SERVICE_PRINCIPAL = hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.name(), ""); + + /** + * Getter for MessageFactory instance. + */ + public static MessageFactory getInstance() { + if (instance == null) { + instance = + getInstance(hiveConf.get(HiveConf.ConfVars.METASTORE_EVENT_MESSAGE_FACTORY.varname)); + } + return instance; + } + + private static MessageFactory getInstance(String className) { + try { + return (MessageFactory)ReflectionUtils.newInstance(JavaUtils.loadClass(className), hiveConf); + } + catch (ClassNotFoundException classNotFound) { + throw new IllegalStateException("Could not construct MessageFactory implementation: ", classNotFound); + } + } + + /** + * Getter for MessageDeserializer, corresponding to the specified format and version. + * @param format Serialization format for notifications. + * @param version Version of serialization format (currently ignored.) + * @return MessageDeserializer. + */ + public static MessageDeserializer getDeserializer(String format, + String version) { + return getInstance(hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + format, + HiveConf.ConfVars.METASTORE_EVENT_MESSAGE_FACTORY.varname)).getDeserializer(); + // Note : The reason this method exists outside the no-arg getDeserializer method is in + // case there is a user-implemented MessageFactory that's used, and some the messages + // are in an older format and the rest in another. Then, what MessageFactory is default + // is irrelevant, we should always use the one that was used to create it to deserialize. + // + // There exist only 2 implementations of this - json and jms + // + // Additional note : rather than as a config parameter, does it make sense to have + // this use jdbc-like semantics that each MessageFactory made available register + // itself for discoverability? Might be worth pursuing. + } + + public abstract MessageDeserializer getDeserializer(); + + /** + * Getter for version-string, corresponding to all constructed messages. + */ + public abstract String getVersion(); + + /** + * Getter for message-format. + */ + public abstract String getMessageFormat(); + + /** + * Factory method for CreateDatabaseMessage. + * @param db The Database being added. + * @return CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db); + + /** + * Factory method for DropDatabaseMessage. + * @param db The Database being dropped. + * @return DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db); + + /** + * Factory method for CreateTableMessage. + * @param table The Table being created. + * @return CreateTableMessage instance. + */ + public abstract CreateTableMessage buildCreateTableMessage(Table table); + + /** + * Factory method for AlterTableMessage. Unlike most of these calls, this one can return null, + * which means no message should be sent. This is because there are many flavors of alter + * table (add column, add partition, etc.). Some are covered elsewhere (like add partition) + * and some are not yet supported. + * @param before The table before the alter + * @param after The table after the alter + * @return + */ + public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after); + + /** + * Factory method for DropTableMessage. + * @param table The Table being dropped. + * @return DropTableMessage instance. + */ + public abstract DropTableMessage buildDropTableMessage(Table table); + + /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partitions are added. + * @param partitions The iterator to set of Partitions being added. + * @return AddPartitionMessage instance. + */ + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitions); + + /** + * Factory method for building AlterPartitionMessage + * @param table The table in which the partition is being altered + * @param before The partition before it was altered + * @param after The partition after it was altered + * @return a new AlterPartitionMessage + */ + public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, + Partition after); + + /** + * Factory method for DropPartitionMessage. + * @param table The Table from which the partition is dropped. + * @param partitions The set of partitions being dropped. + * @return DropPartitionMessage instance. + */ + public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Iterator partitions); + + /** + * Factory method for CreateFunctionMessage. + * @param fn The Function being added. + * @return CreateFunctionMessage instance. + */ + public abstract CreateFunctionMessage buildCreateFunctionMessage(Function fn); + + /** + * Factory method for DropFunctionMessage. + * @param fn The Function being dropped. + * @return DropFunctionMessage instance. + */ + public abstract DropFunctionMessage buildDropFunctionMessage(Function fn); + + /** + * Factory method for CreateIndexMessage. + * @param idx The Index being added. + * @return CreateIndexMessage instance. + */ + public abstract CreateIndexMessage buildCreateIndexMessage(Index idx); + + /** + * Factory method for DropIndexMessage. + * @param idx The Index being dropped. + * @return DropIndexMessage instance. + */ + public abstract DropIndexMessage buildDropIndexMessage(Index idx); + + /** + * Factory method for AlterIndexMessage. + * @param before The index before the alter + * @param after The index after the alter + * @return AlterIndexMessage + */ + public abstract AlterIndexMessage buildAlterIndexMessage(Index before, Index after); + + /** + * Factory method for building insert message + * @param db Name of the database the insert occurred in + * @param table Name of the table the insert occurred in + * @param partVals Partition values for the partition that the insert occurred in, may be null + * if the insert was done into a non-partitioned table + * @param files List of files created as a result of the insert, may be null. + * @return instance of InsertMessage + */ + public abstract InsertMessage buildInsertMessage(String db, String table, + Map partVals, List files); +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java new file mode 100644 index 0000000..972c5c2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of AddPartitionMessage. + */ +public class JSONAddPartitionMessage extends AddPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableObjJson; + + @JsonProperty + Long timestamp; + + @JsonProperty + List> partitions; + + @JsonProperty + List partitionListJson; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONAddPartitionMessage() { + } + + public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + /** + * Note that we get an Iterator rather than an Iterable here: so we can only walk thru the list once + */ + public JSONAddPartitionMessage(String server, String servicePrincipal, Table tableObj, + Iterator partitionsIterator, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = tableObj.getDbName(); + this.table = tableObj.getTableName(); + this.timestamp = timestamp; + partitions = new ArrayList>(); + partitionListJson = new ArrayList(); + Partition partitionObj; + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + while (partitionsIterator.hasNext()) { + partitionObj = partitionsIterator.next(); + partitions.add(JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObj)); + partitionListJson.add(JSONMessageFactory.createPartitionObjJson(partitionObj)); + } + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + checkValid(); + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public String getTable() { + return table; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public List> getPartitions() { + return partitions; + } + + public String getTableObjJson() { + return tableObjJson; + } + + public List getPartitionListJson() { + return partitionListJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterIndexMessage.java new file mode 100644 index 0000000..aa32908 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterIndexMessage.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of AlterIndexMessage. + */ +public class JSONAlterIndexMessage extends AlterIndexMessage { + + @JsonProperty + String server, servicePrincipal, db, beforeIndexObjJson, afterIndexObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONAlterIndexMessage() {} + + public JSONAlterIndexMessage(String server, String servicePrincipal, Index before, Index after, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = after.getDbName(); + this.timestamp = timestamp; + try { + this.beforeIndexObjJson = JSONMessageFactory.createIndexObjJson(before); + this.afterIndexObjJson = JSONMessageFactory.createIndexObjJson(after); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Index object", ex); + } + + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getBeforeIndexObjJson() { + return beforeIndexObjJson; + } + + public String getAfterIndexObjJson() { + return afterIndexObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java new file mode 100644 index 0000000..b62fda8 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * JSON alter table message + */ +public class JSONAlterPartitionMessage extends AlterPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableObjJson; + + @JsonProperty + Long timestamp; + + @JsonProperty + Map keyValues; + + @JsonProperty + List partitionListJson; + + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterPartitionMessage() { + } + + public JSONAlterPartitionMessage(String server, String servicePrincipal, String db, String table, + Map keyValues, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + this.keyValues = keyValues; + checkValid(); + } + + public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, + Partition partitionObjBefore, Partition partitionObjAfter, Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), + JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + partitionListJson = new ArrayList(); + partitionListJson.add(JSONMessageFactory.createPartitionObjJson(partitionObjAfter)); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + @Override + public Map getKeyValues() { + return keyValues; + } + + public String getTableObjJson() { + return tableObjJson; + } + + public List getPartitionListJson() { + return partitionListJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java new file mode 100644 index 0000000..c6e20ce --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON alter table message + */ +public class JSONAlterTableMessage extends AlterTableMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterTableMessage() { + } + + public JSONAlterTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObj, + Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + public String getTableObjJson() { + return tableObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java new file mode 100644 index 0000000..f8717b2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateDatabaseMessage. + */ +public class JSONCreateDatabaseMessage extends CreateDatabaseMessage { + + @JsonProperty + String server, servicePrincipal, db; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONCreateDatabaseMessage() {} + + public JSONCreateDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java new file mode 100644 index 0000000..a88d85f --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateFunctionMessage. + */ +public class JSONCreateFunctionMessage extends CreateFunctionMessage { + + @JsonProperty + String server, servicePrincipal, db, functionObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONCreateFunctionMessage() {} + + public JSONCreateFunctionMessage(String server, String servicePrincipal, Function fn, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = fn.getDbName(); + this.timestamp = timestamp; + try { + this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Function object", ex); + } + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getFunctionObjJson() { + return functionObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateIndexMessage.java new file mode 100644 index 0000000..d376c77 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateIndexMessage.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.messaging.CreateIndexMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateIndexMessage. + */ +public class JSONCreateIndexMessage extends CreateIndexMessage { + + @JsonProperty + String server, servicePrincipal, db, indexObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONCreateIndexMessage() {} + + public JSONCreateIndexMessage(String server, String servicePrincipal, Index index, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = index.getDbName(); + try { + this.indexObjJson = JSONMessageFactory.createIndexObjJson(index); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Index object", ex); + } + + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getIndexObjJson() { + return indexObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java new file mode 100644 index 0000000..aa737ca --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of CreateTableMessage. + */ +public class JSONCreateTableMessage extends CreateTableMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableObjJson; + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONCreateTableMessage() { + } + + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + public JSONCreateTableMessage(String server, String servicePrincipal, Table tableObj, + Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + public String getTableObjJson() { + return tableObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropDatabaseMessage.java new file mode 100644 index 0000000..be17e6d --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropDatabaseMessage.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of DropDatabaseMessage. + */ +public class JSONDropDatabaseMessage extends DropDatabaseMessage { + + @JsonProperty + String server, servicePrincipal, db; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONDropDatabaseMessage() {} + + public JSONDropDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.timestamp = timestamp; + checkValid(); + } + + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropFunctionMessage.java new file mode 100644 index 0000000..d994872 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropFunctionMessage.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateDatabaseMessage. + */ +public class JSONDropFunctionMessage extends DropFunctionMessage { + + @JsonProperty + String server, servicePrincipal, db, functionObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONDropFunctionMessage() {} + + public JSONDropFunctionMessage(String server, String servicePrincipal, Function fn, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = fn.getDbName(); + this.timestamp = timestamp; + try { + this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Function object", ex); + } + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getFunctionObjJson() { + return functionObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropIndexMessage.java new file mode 100644 index 0000000..131d345 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropIndexMessage.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.messaging.DropIndexMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of DropIndexMessage. + */ +public class JSONDropIndexMessage extends DropIndexMessage { + + @JsonProperty + String server, servicePrincipal, db, indexObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONDropIndexMessage() {} + + public JSONDropIndexMessage(String server, String servicePrincipal, Index index, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = index.getDbName(); + try { + this.indexObjJson = JSONMessageFactory.createIndexObjJson(index); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Index object", ex); + } + + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getIndexObjJson() { + return indexObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java new file mode 100644 index 0000000..b8ea224 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of DropPartitionMessage. + */ +public class JSONDropPartitionMessage extends DropPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List> partitions; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONDropPartitionMessage() { + } + + public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public String getTable() { + return table; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public List> getPartitions() { + return partitions; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java new file mode 100644 index 0000000..635ab61 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of DropTableMessage. + */ +public class JSONDropTableMessage extends DropTableMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONDropTableMessage() { + } + + public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getTable() { + return table; + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java new file mode 100644 index 0000000..ef89b17 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of DropTableMessage. + */ +public class JSONInsertMessage extends InsertMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List files; + + @JsonProperty + Map partKeyVals; + + /** + * Default constructor, needed for Jackson. + */ + public JSONInsertMessage() {} + + public JSONInsertMessage(String server, String servicePrincipal, String db, String table, + Map partKeyVals, List files, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + this.partKeyVals = partKeyVals; + this.files = files; + checkValid(); + } + + + @Override + public String getTable() { return table; } + + @Override + public String getServer() { return server; } + + @Override + public Map getPartitionKeyValues() { + return partKeyVals; + } + + @Override + public List getFiles() { + return files; + } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java new file mode 100644 index 0000000..41732c7 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateIndexMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropIndexMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * MessageDeserializer implementation, for deserializing from JSON strings. + */ +public class JSONMessageDeserializer extends MessageDeserializer { + + static ObjectMapper mapper = new ObjectMapper(); // Thread-safe. + + static { + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateDatabaseMessage.", exception); + } + } + + @Override + public DropDatabaseMessage getDropDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception); + } + } + + @Override + public CreateTableMessage getCreateTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateTableMessage.", exception); + } + } + + @Override + public AlterTableMessage getAlterTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct appropriate alter table type.", + exception); + } + } + + @Override + public DropTableMessage getDropTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropTableMessage.", exception); + } + } + + @Override + public AddPartitionMessage getAddPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAddPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct AddPartitionMessage.", exception); + } + } + + @Override + public AlterPartitionMessage getAlterPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterPartitionMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct AlterPartitionMessage.", e); + } + } + + @Override + public DropPartitionMessage getDropPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct DropPartitionMessage.", exception); + } + } + + @Override + public CreateFunctionMessage getCreateFunctionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateFunctionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateFunctionMessage.", exception); + } + } + + @Override + public DropFunctionMessage getDropFunctionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropFunctionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception); + } + } + + @Override + public CreateIndexMessage getCreateIndexMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateIndexMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateIndexMessage.", exception); + } + } + + @Override + public DropIndexMessage getDropIndexMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropIndexMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropIndexMessage.", exception); + } + } + + @Override + public AlterIndexMessage getAlterIndexMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterIndexMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONAlterIndexMessage.", exception); + } + } + + @Override + public InsertMessage getInsertMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONInsertMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct InsertMessage", e); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java new file mode 100644 index 0000000..0407210 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateIndexMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropIndexMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + +/** + * The JSON implementation of the MessageFactory. Constructs JSON implementations of each + * message-type. + */ +public class JSONMessageFactory extends MessageFactory { + + private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName()); + + private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); + + @Override + public MessageDeserializer getDeserializer() { + return deserializer; + } + + @Override + public String getVersion() { + return "0.1"; + } + + @Override + public String getMessageFormat() { + return "json"; + } + + @Override + public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { + return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now()); + } + + @Override + public DropDatabaseMessage buildDropDatabaseMessage(Database db) { + return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now()); + } + + @Override + public CreateTableMessage buildCreateTableMessage(Table table) { + return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, now()); + } + + @Override + public AlterTableMessage buildAlterTableMessage(Table before, Table after) { + return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, after, now()); + } + + @Override + public DropTableMessage buildDropTableMessage(Table table) { + return new JSONDropTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), now()); + } + + @Override + public AddPartitionMessage buildAddPartitionMessage(Table table, + Iterator partitionsIterator) { + return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, + partitionsIterator, now()); + } + + @Override + public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, + Partition after) { + return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, before, after, + now()); + } + + @Override + public DropPartitionMessage buildDropPartitionMessage(Table table, + Iterator partitionsIterator) { + return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), getPartitionKeyValues(table, partitionsIterator), now()); + } + + @Override + public CreateFunctionMessage buildCreateFunctionMessage(Function fn) { + return new JSONCreateFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now()); + } + + @Override + public DropFunctionMessage buildDropFunctionMessage(Function fn) { + return new JSONDropFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now()); + } + + @Override + public CreateIndexMessage buildCreateIndexMessage(Index idx) { + return new JSONCreateIndexMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, idx, now()); + } + + @Override + public DropIndexMessage buildDropIndexMessage(Index idx) { + return new JSONDropIndexMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, idx, now()); + } + + @Override + public AlterIndexMessage buildAlterIndexMessage(Index before, Index after) { + return new JSONAlterIndexMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, now()); + } + + @Override + public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, + List files) { + return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, + files, now()); + } + + private long now() { + return System.currentTimeMillis() / 1000; + } + + static Map getPartitionKeyValues(Table table, Partition partition) { + Map partitionKeys = new LinkedHashMap(); + for (int i = 0; i < table.getPartitionKeysSize(); ++i) + partitionKeys.put(table.getPartitionKeys().get(i).getName(), partition.getValues().get(i)); + return partitionKeys; + } + + static List> getPartitionKeyValues(final Table table, + Iterator iterator) { + return Lists.newArrayList(Iterators.transform(iterator, + new com.google.common.base.Function>() { + @Override + public Map apply(@Nullable Partition partition) { + return getPartitionKeyValues(table, partition); + } + })); + } + + static String createTableObjJson(Table tableObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(tableObj, "UTF-8"); + } + + static String createPartitionObjJson(Partition partitionObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(partitionObj, "UTF-8"); + } + + static String createFunctionObjJson(Function functionObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(functionObj, "UTF-8"); + } + + static String createIndexObjJson(Index indexObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(indexObj, "UTF-8"); + } + + public static ObjectNode getJsonTree(NotificationEvent event) throws Exception { + JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonParser, ObjectNode.class); + } + + public static Table getTableObj(ObjectNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Table tableObj = new Table(); + String tableJson = jsonTree.get("tableObjJson").asText(); + deSerializer.deserialize(tableObj, tableJson, "UTF-8"); + return tableObj; + } + + public static List getPartitionObjList(ObjectNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + List partitionObjList = new ArrayList(); + Partition partitionObj = new Partition(); + Iterator jsonArrayIterator = jsonTree.get("partitionListJson").iterator(); + while (jsonArrayIterator.hasNext()) { + deSerializer.deserialize(partitionObj, jsonArrayIterator.next().asText(), "UTF-8"); + partitionObjList.add(partitionObj); + } + return partitionObjList; + } + + public static Function getFunctionObj(ObjectNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Function funcObj = new Function(); + String tableJson = jsonTree.get("functionObjJson").asText(); + deSerializer.deserialize(funcObj, tableJson, "UTF-8"); + return funcObj; + } + + public static Index getIndexObj(ObjectNode jsonTree) throws Exception { + return getIndexObj(jsonTree, "indexObjJson"); + } + + public static Index getIndexObj(ObjectNode jsonTree, String indexObjKey) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Index indexObj = new Index(); + String tableJson = jsonTree.get(indexObjKey).asText(); + deSerializer.deserialize(indexObj, tableJson, "UTF-8"); + return indexObj; + } +}