diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a9474c4..ab7ea3c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -250,6 +250,7 @@ private static URL checkConfigFile(File f) { HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, HiveConf.ConfVars.METASTORE_EVENT_CLEAN_FREQ, HiveConf.ConfVars.METASTORE_EVENT_EXPIRY_DURATION, + HiveConf.ConfVars.METASTORE_EVENT_MESSAGE_FACTORY, HiveConf.ConfVars.METASTORE_FILTER_HOOK, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS, @@ -794,6 +795,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration", "0s", new TimeValidator(TimeUnit.SECONDS), "Duration after which events expire from events table"), + METASTORE_EVENT_MESSAGE_FACTORY("hive.metastore.event.message.factory", + "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory", + "Factory class for making encoding and decoding messages in the events generated."), METASTORE_EXECUTE_SET_UGI("hive.metastore.execute.setugi", true, "In unsecure mode, setting this property to true will cause the metastore to execute DFS operations using \n" + "the client's reported user and group permissions. Note that this property must be set on \n" + diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index ea7520d..6f33167 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -45,10 +45,10 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.messaging.MessageFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import java.util.concurrent.TimeUnit; @@ -121,10 +121,11 @@ public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException { * @param tableEvent table event. * @throws MetaException */ - public void onCreateTable (CreateTableEvent tableEvent) throws MetaException { + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { Table t = tableEvent.getTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), msgFactory + .buildCreateTableMessage(t).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); @@ -134,10 +135,11 @@ public void onCreateTable (CreateTableEvent tableEvent) throws MetaException { * @param tableEvent table event. * @throws MetaException */ - public void onDropTable (DropTableEvent tableEvent) throws MetaException { + public void onDropTable(DropTableEvent tableEvent) throws MetaException { Table t = tableEvent.getTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory + .buildDropTableMessage(t).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); @@ -147,12 +149,12 @@ public void onDropTable (DropTableEvent tableEvent) throws MetaException { * @param tableEvent alter table event * @throws MetaException */ - public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { + public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { Table before = tableEvent.getOldTable(); Table after = tableEvent.getNewTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_ALTER_TABLE_EVENT, - msgFactory.buildAlterTableMessage(before, after).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory + .buildAlterTableMessage(before, after).toString()); event.setDbName(after.getDbName()); event.setTableName(after.getTableName()); enqueue(event); @@ -162,12 +164,11 @@ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { * @param partitionEvent partition event * @throws MetaException */ - public void onAddPartition (AddPartitionEvent partitionEvent) - throws MetaException { + public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { Table t = partitionEvent.getTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_ADD_PARTITION_EVENT, - msgFactory.buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msgFactory + .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); @@ -177,11 +178,11 @@ public void onAddPartition (AddPartitionEvent partitionEvent) * @param partitionEvent partition event * @throws MetaException */ - public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaException { + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { Table t = partitionEvent.getTable(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_DROP_PARTITION_EVENT, - msgFactory.buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory + .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); @@ -191,12 +192,12 @@ public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaExce * @param partitionEvent partition event * @throws MetaException */ - public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaException { + public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException { Partition before = partitionEvent.getOldPartition(); Partition after = partitionEvent.getNewPartition(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_ALTER_PARTITION_EVENT, - msgFactory.buildAlterPartitionMessage(partitionEvent.getTable(),before, after).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory + .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString()); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); enqueue(event); @@ -206,11 +207,11 @@ public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaEx * @param dbEvent database event * @throws MetaException */ - public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException { + public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { Database db = dbEvent.getDatabase(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_CREATE_DATABASE_EVENT, - msgFactory.buildCreateDatabaseMessage(db).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory + .buildCreateDatabaseMessage(db).toString()); event.setDbName(db.getName()); enqueue(event); } @@ -219,11 +220,11 @@ public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException * @param dbEvent database event * @throws MetaException */ - public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { Database db = dbEvent.getDatabase(); - NotificationEvent event = new NotificationEvent(0, now(), - HCatConstants.HCAT_DROP_DATABASE_EVENT, - msgFactory.buildDropDatabaseMessage(db).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory + .buildDropDatabaseMessage(db).toString()); event.setDbName(db.getName()); enqueue(event); } @@ -296,9 +297,10 @@ public void onAlterIndex (AlterIndexEvent indexEvent) throws MetaException { @Override public void onInsert(InsertEvent insertEvent) throws MetaException { - NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_INSERT_EVENT, - msgFactory.buildInsertMessage(insertEvent.getDb(), insertEvent.getTable(), - insertEvent.getPartitionKeyValues(), insertEvent.getFiles()).toString()); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage( + insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), + insertEvent.getFiles()).toString()); event.setDbName(insertEvent.getDb()); event.setTableName(insertEvent.getTable()); enqueue(event); diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java index a661962..2f0aeb3 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java @@ -18,28 +18,32 @@ */ package org.apache.hive.hcatalog.api; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import junit.framework.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.listener.DbNotificationListener; -import org.apache.hive.hcatalog.messaging.HCatEventMessage; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ObjectNode; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -125,8 +129,16 @@ public void createTable() throws Exception { assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType()); assertEquals(dbName, event.getDbName()); assertEquals("hcatcreatetable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"hcatcreatetable\",\"timestamp\":[0-9]+}")); + + // Parse the message field + ObjectNode jsonTree = getJsonTree(event); + assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("hcatcreatetable", jsonTree.get("table").asText()); + Table tableObj = getTableObj(jsonTree); + assertEquals("default", tableObj.getDbName()); + assertEquals("hcatcreatetable", tableObj.getTableName()); + assertEquals(table.getSd(), tableObj.getSd()); } // TODO - Currently no way to test alter table, as this interface doesn't support alter table @@ -166,11 +178,8 @@ public void addPartition() throws Exception { String partName = "testpart"; Map partSpec = new HashMap(1); partSpec.put(partColName, partName); - hCatClient.addPartition( - HCatAddPartitionDesc.create( - new HCatPartition(table, partSpec, null) - ).build() - ); + hCatClient.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partSpec, null)) + .build()); List events = hCatClient.getNextNotification(firstEventId, 0, null); assertEquals(2, events.size()); @@ -181,9 +190,6 @@ public void addPartition() throws Exception { assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals(tableName, event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"hcataddparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}")); } // TODO - currently no way to test alter partition, as HCatClient doesn't support it. @@ -265,4 +271,18 @@ public boolean accept(NotificationEvent event) { assertEquals(1, events.size()); assertEquals(firstEventId + 1, events.get(0).getEventId()); } + + private Table getTableObj(JsonNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Table tableObj = new Table(); + String tableJson = jsonTree.get("tableObjJson").asText(); + deSerializer.deserialize(tableObj, tableJson, "UTF-8"); + return tableObj; + } + + private ObjectNode getJsonTree(HCatNotificationEvent event) throws Exception { + JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonParser, ObjectNode.class); + } } diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 4f97cf4..746699c 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -18,25 +18,18 @@ */ package org.apache.hive.hcatalog.listener; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.FunctionType; -import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.ResourceType; -import org.apache.hadoop.hive.metastore.api.ResourceUri; -import org.apache.htrace.fasterxml.jackson.core.JsonFactory; -import org.apache.htrace.fasterxml.jackson.core.JsonParser; -import org.apache.htrace.fasterxml.jackson.databind.JsonNode; -import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; -import org.apache.htrace.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.protocol.TJSONProtocol; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -45,29 +38,36 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventRequestData; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.ResourceType; +import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.htrace.fasterxml.jackson.core.JsonFactory; +import org.apache.htrace.fasterxml.jackson.core.JsonParser; +import org.apache.htrace.fasterxml.jackson.databind.JsonNode; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; +import org.apache.htrace.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.protocol.TJSONProtocol; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class TestDbNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName()); private static final int EVENTS_TTL = 30; @@ -183,26 +183,37 @@ public void createTable() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, emptyParameters, + null, null, null); msClient.createTable(table); - + // Get the event NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(0); assertEquals(firstEventId + 1, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("mytable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}")); - table = new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + // Parse the message field + ObjectNode jsonTree = getJsonTree(event); + assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("mytable", jsonTree.get("table").asText()); + Table tableObj = getTableObj(jsonTree); + assertEquals("default", tableObj.getDbName()); + assertEquals("mytable", tableObj.getTableName()); + assertEquals("me", tableObj.getOwner()); + assertEquals(table.getSd(), tableObj.getSd()); + + table = + new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, emptyParameters, + null, null, null); DummyRawStoreFailEvent.setEventSucceed(false); try { msClient.createTable(table); @@ -223,11 +234,12 @@ public void alterTable() throws Exception { serde, null, null, emptyParameters); Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd, new ArrayList(), emptyParameters, null, null, null); + // Event 1 msClient.createTable(table); - cols.add(new FieldSchema("col2", "int", "")); table = new Table("alttable", "default", "me", startTime, startTime, 0, sd, new ArrayList(), emptyParameters, null, null, null); + // Event 2 msClient.alter_table("default", "alttable", table); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); @@ -239,9 +251,17 @@ public void alterTable() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alttable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," + - "\"timestamp\":[0-9]+}")); + + // Parse the message field + ObjectNode jsonTree = getJsonTree(event); + assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("alttable", jsonTree.get("table").asText()); + Table tableObj = getTableObj(jsonTree); + assertEquals("default", tableObj.getDbName()); + assertEquals("alttable", tableObj.getTableName()); + assertEquals("me", tableObj.getOwner()); + assertEquals(table.getSd(), tableObj.getSd()); DummyRawStoreFailEvent.setEventSucceed(false); try { @@ -319,9 +339,6 @@ public void addPartition() throws Exception { assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("addparttable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"addparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}")); partition = new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist", startTime, startTime, sd, emptyParameters); @@ -365,10 +382,6 @@ public void alterPartition() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alterparttable", event.getTableName()); - assertTrue(event.getMessage(), - event.getMessage().matches("\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," + - "\"timestamp\":[0-9]+,\"keyValues\":\\{\"ds\":\"today\"}}")); DummyRawStoreFailEvent.setEventSucceed(false); try { @@ -972,6 +985,14 @@ public void cleanupNotifs() throws Exception { assertEquals(0, rsp2.getEventsSize()); } + private Table getTableObj(JsonNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Table tableObj = new Table(); + String tableJson = jsonTree.get("tableObjJson").asText(); + deSerializer.deserialize(tableObj, tableJson, "UTF-8"); + return tableObj; + } + private ObjectNode getJsonTree(NotificationEvent event) throws Exception { JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); ObjectMapper mapper = new ObjectMapper(); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java new file mode 100644 index 0000000..6d65f7e --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; +import java.util.Map; + +/** + * The HCat message sent when partition(s) are added to a table. + */ +public abstract class AddPartitionMessage extends EventMessage { + + protected AddPartitionMessage() { + super(EventType.ADD_PARTITION); + } + + /** + * Getter for name of table (where partitions are added). + * @return Table-name (String). + */ + public abstract String getTable(); + + /** + * Getter for list of partitions added. + * @return List of maps, where each map identifies values for each partition-key, for every added partition. + */ + public abstract List> getPartitions (); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java new file mode 100644 index 0000000..23863f1 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.Map; + +/** + * HCat message sent when a table is Altered. + */ +public abstract class AlterPartitionMessage extends EventMessage { + + protected AlterPartitionMessage() { + super(EventType.ALTER_PARTITION); + } + + public abstract String getTable(); + + public abstract Map getKeyValues(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) throw new IllegalStateException("Table name unset."); + if (getKeyValues() == null) throw new IllegalStateException("Partition values unset"); + return super.checkValid(); + } +} + diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java new file mode 100644 index 0000000..6ffcd8b --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +/** + * HCat message sent when a table is Altered. + */ +public abstract class AlterTableMessage extends EventMessage { + + protected AlterTableMessage() { + super(EventType.ALTER_TABLE); + } + + public abstract String getTable(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java new file mode 100644 index 0000000..8be5d96 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +/** + * HCat message sent when a Database is created in HCatalog. + */ +public abstract class CreateDatabaseMessage extends EventMessage { + + protected CreateDatabaseMessage() { + super(EventType.CREATE_DATABASE); + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java new file mode 100644 index 0000000..531c394 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +/** + * HCat message sent when a table is created in HCatalog. + */ +public abstract class CreateTableMessage extends EventMessage { + + protected CreateTableMessage() { + super(EventType.CREATE_TABLE); + } + + /** + * Getter for the name of table created in HCatalog. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java new file mode 100644 index 0000000..6775fb9 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +/** + * HCat message sent when a Database is dropped from HCatalog. + */ +public abstract class DropDatabaseMessage extends EventMessage { + + protected DropDatabaseMessage() { + super(EventType.DROP_DATABASE); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java new file mode 100644 index 0000000..cc41c94 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; +import java.util.Map; + +/** + * HCat message sent when a partition is dropped in HCatalog. + */ +public abstract class DropPartitionMessage extends EventMessage { + + protected DropPartitionMessage() { + super(EventType.DROP_PARTITION); + } + + public abstract String getTable(); + public abstract List> getPartitions (); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java new file mode 100644 index 0000000..0149a64 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +/** + * HCat message sent when a Table is dropped in HCatalog. + */ +public abstract class DropTableMessage extends EventMessage { + + protected DropTableMessage() { + super(EventType.DROP_TABLE); + } + + /** + * Getter for the name of the table being dropped. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java new file mode 100644 index 0000000..f7354b1 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +/** + * Class representing messages emitted when Metastore operations are done. + * (E.g. Creation and deletion of databases, tables and partitions.) + */ +public abstract class EventMessage { + + /** + * Enumeration of all supported types of Metastore operations. + */ + public static enum EventType { + + CREATE_DATABASE(MessageFactory.CREATE_DATABASE_EVENT), + DROP_DATABASE(MessageFactory.DROP_DATABASE_EVENT), + CREATE_TABLE(MessageFactory.CREATE_TABLE_EVENT), + DROP_TABLE(MessageFactory.DROP_TABLE_EVENT), + ADD_PARTITION(MessageFactory.ADD_PARTITION_EVENT), + DROP_PARTITION(MessageFactory.DROP_PARTITION_EVENT), + ALTER_TABLE(MessageFactory.ALTER_TABLE_EVENT), + ALTER_PARTITION(MessageFactory.ALTER_PARTITION_EVENT), + INSERT(MessageFactory.INSERT_EVENT); + + private String typeString; + + EventType(String typeString) { + this.typeString = typeString; + } + + @Override + public String toString() { return typeString; } + } + + protected EventType eventType; + + protected EventMessage(EventType eventType) { + this.eventType = eventType; + } + + public EventType getEventType() { + return eventType; + } + + /** + * Getter for HCatalog Server's URL. + * (This is where the event originates from.) + * @return HCatalog Server's URL (String). + */ + public abstract String getServer(); + + /** + * Getter for the Kerberos principal of the HCatalog service. + * @return HCatalog Service Principal (String). + */ + public abstract String getServicePrincipal(); + + /** + * Getter for the name of the Database on which the Metastore operation is done. + * @return Database-name (String). + */ + public abstract String getDB(); + + /** + * Getter for the timestamp associated with the operation. + * @return Timestamp (Long - seconds since epoch). + */ + public abstract Long getTimestamp(); + + /** + * Class invariant. Checked after construction or deserialization. + */ + public EventMessage checkValid() { + if (getServer() == null || getServicePrincipal() == null) + throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null."); + if (getEventType() == null) + throw new IllegalStateException("Event-type unset."); + if (getDB() == null) + throw new IllegalArgumentException("DB-name unset."); + + return this; + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java new file mode 100644 index 0000000..932af7e --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class EventUtils { + + /** + * Utility function that constructs a notification filter to match a given db name and/or table name. + * If dbName == null, fetches all warehouse events. + * If dnName != null, but tableName == null, fetches all events for the db + * If dbName != null && tableName != null, fetches all events for the specified table + * @param dbName + * @param tableName + * @return + */ + public static IMetaStoreClient.NotificationFilter getDbTblNotificationFilter(final String dbName, final String tableName){ + return new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent event) { + if (event == null){ + return false; // get rid of trivial case first, so that we can safely assume non-null + } + if (dbName == null){ + return true; // if our dbName is null, we're interested in all wh events + } + if (dbName.equalsIgnoreCase(event.getDbName())){ + if ( (tableName == null) + // if our dbName is equal, but tableName is blank, we're interested in this db-level event + || (tableName.equalsIgnoreCase(event.getTableName())) + // table level event that matches us + ){ + return true; + } + } + return false; + } + }; + } + + + public interface NotificationFetcher { + public int getBatchSize() throws IOException; + public long getCurrentNotificationEventId() throws IOException; + public List getNextNotificationEvents( + long pos, IMetaStoreClient.NotificationFilter filter) throws IOException; + } + + // MetaStoreClient-based impl of NotificationFetcher + public static class MSClientNotificationFetcher implements NotificationFetcher{ + + private IMetaStoreClient msc = null; + private Integer batchSize = null; + + public MSClientNotificationFetcher(IMetaStoreClient msc){ + this.msc = msc; + } + + @Override + public int getBatchSize() throws IOException { + if (batchSize == null){ + try { + batchSize = Integer.parseInt( + msc.getConfigValue(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname, "50")); + // TODO: we're asking the metastore what its configuration for this var is - we may + // want to revisit to pull from client side instead. The reason I have it this way + // is because the metastore is more likely to have a reasonable config for this than + // an arbitrary client. + } catch (TException e) { + throw new IOException(e); + } + } + return batchSize; + } + + @Override + public long getCurrentNotificationEventId() throws IOException { + try { + return msc.getCurrentNotificationEventId().getEventId(); + } catch (TException e) { + throw new IOException(e); + } + } + + @Override + public List getNextNotificationEvents( + long pos, IMetaStoreClient.NotificationFilter filter) throws IOException { + try { + return msc.getNextNotification(pos,getBatchSize(), filter).getEvents(); + } catch (TException e) { + throw new IOException(e); + } + } + } + + public static class NotificationEventIterator implements Iterator { + + private NotificationFetcher nfetcher; + private IMetaStoreClient.NotificationFilter filter; + private int maxEvents; + + private Iterator batchIter = null; + private List batch = null; + private long pos; + private long maxPos; + private int eventCount; + + public NotificationEventIterator( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + String dbName, String tableName) throws IOException { + init(nfetcher, eventFrom, maxEvents, EventUtils.getDbTblNotificationFilter(dbName, tableName)); + // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter + // is an operation that needs to run before delegating to the other ctor, and this messes up chaining + // ctors + } + + public NotificationEventIterator( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws IOException { + init(nfetcher,eventFrom,maxEvents,filter); + } + + private void init( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws IOException { + this.nfetcher = nfetcher; + this.filter = filter; + this.pos = eventFrom; + if (maxEvents < 1){ + // 0 or -1 implies fetch everything + this.maxEvents = Integer.MAX_VALUE; + } else { + this.maxEvents = maxEvents; + } + + this.eventCount = 0; + this.maxPos = nfetcher.getCurrentNotificationEventId(); + } + + private void fetchNextBatch() throws IOException { + batch = nfetcher.getNextNotificationEvents(pos, filter); + int batchSize = nfetcher.getBatchSize(); + while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){ + // no valid events this batch, but we're still not done processing events + pos += batchSize; + batch = nfetcher.getNextNotificationEvents(pos,filter); + } + + if (batch == null){ + batch = new ArrayList(); + // instantiate empty list so that we don't error out on iterator fetching. + // If we're here, then the next check of pos will show our caller that + // that we've exhausted our event supply + } + batchIter = batch.iterator(); + } + + @Override + public boolean hasNext() { + if (eventCount >= maxEvents){ + // If we've already satisfied the number of events we were supposed to deliver, we end it. + return false; + } + if ((batchIter != null) && (batchIter.hasNext())){ + // If we have a valid batchIter and it has more elements, return them. + return true; + } + // If we're here, we want more events, and either batchIter is null, or batchIter + // has reached the end of the current batch. Let's fetch the next batch. + try { + fetchNextBatch(); + } catch (IOException e) { + // Regrettable that we have to wrap the IOException into a RuntimeException, + // but throwing the exception is the appropriate result here, and hasNext() + // signature will only allow RuntimeExceptions. Iterator.hasNext() really + // should have allowed IOExceptions + throw new RuntimeException(e); + } + // New batch has been fetched. If it's not empty, we have more elements to process. + return !batch.isEmpty(); + } + + @Override + public NotificationEvent next() { + eventCount++; + NotificationEvent ev = batchIter.next(); + pos = ev.getEventId(); + return ev; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator"); + } + + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java new file mode 100644 index 0000000..fe747df --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; +import java.util.Map; + +/** + * HCat message sent when an insert is done to a table or partition. + */ +public abstract class InsertMessage extends EventMessage { + + protected InsertMessage() { + super(EventType.INSERT); + } + + /** + * Getter for the name of the table being insert into. + * @return Table-name (String). + */ + public abstract String getTable(); + + /** + * Get the map of partition keyvalues. Will be null if this insert is to a table and not a + * partition. + * @return Map of partition keyvalues, or null. + */ + public abstract Map getPartitionKeyValues(); + + /** + * Get the list of files created as a result of this DML operation. May be null. + * @return List of new files, or null. + */ + public abstract List getFiles(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java new file mode 100644 index 0000000..4a03343 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +/** + * Interface for converting HCat events from String-form back to EventMessage instances. + */ +public abstract class MessageDeserializer { + + /** + * Method to construct EventMessage from string. + */ + public EventMessage getEventMessage(String eventTypeString, String messageBody) { + + switch (EventMessage.EventType.valueOf(eventTypeString)) { + case CREATE_DATABASE: + return getCreateDatabaseMessage(messageBody); + case DROP_DATABASE: + return getDropDatabaseMessage(messageBody); + case CREATE_TABLE: + return getCreateTableMessage(messageBody); + case ALTER_TABLE: + return getAlterTableMessage(messageBody); + case DROP_TABLE: + return getDropTableMessage(messageBody); + case ADD_PARTITION: + return getAddPartitionMessage(messageBody); + case ALTER_PARTITION: + return getAlterPartitionMessage(messageBody); + case DROP_PARTITION: + return getDropPartitionMessage(messageBody); + case INSERT: + return getInsertMessage(messageBody); + + default: + throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); + } + } + + /** + * Method to de-serialize CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody); + + /** + * Method to de-serialize DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody); + + /** + * Method to de-serialize CreateTableMessage instance. + */ + public abstract CreateTableMessage getCreateTableMessage(String messageBody); + + /** + * Method to de-serialize AlterTableMessge + * @param messageBody string message + * @return object message + */ + public abstract AlterTableMessage getAlterTableMessage(String messageBody); + + /** + * Method to de-serialize DropTableMessage instance. + */ + public abstract DropTableMessage getDropTableMessage(String messageBody); + + /** + * Method to de-serialize AddPartitionMessage instance. + */ + public abstract AddPartitionMessage getAddPartitionMessage(String messageBody); + + /** + * Method to deserialize AlterPartitionMessage + * @param messageBody the message in serialized form + * @return message in object form + */ + public abstract AlterPartitionMessage getAlterPartitionMessage(String messageBody); + + /** + * Method to de-serialize DropPartitionMessage instance. + */ + public abstract DropPartitionMessage getDropPartitionMessage(String messageBody); + + /** + * Method to deserialize InsertMessage + * @param messageBody the message in serialized form + * @return message in object form + */ + public abstract InsertMessage getInsertMessage(String messageBody); + + // Protection against construction. + protected MessageDeserializer() {} +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java new file mode 100644 index 0000000..ae278f6 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.ReflectionUtils; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Abstract Factory for the construction of HCatalog message instances. + */ +public abstract class MessageFactory { + + // Common name constants for event messages + public static final String ADD_PARTITION_EVENT = "ADD_PARTITION"; + public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION"; + public static final String DROP_PARTITION_EVENT = "DROP_PARTITION"; + public static final String CREATE_TABLE_EVENT = "CREATE_TABLE"; + public static final String ALTER_TABLE_EVENT = "ALTER_TABLE"; + public static final String DROP_TABLE_EVENT = "DROP_TABLE"; + public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE"; + public static final String DROP_DATABASE_EVENT = "DROP_DATABASE"; + public static final String INSERT_EVENT = "INSERT"; + + + private static MessageFactory instance = null; + + protected static final HiveConf hiveConf = new HiveConf(); + static { + hiveConf.addResource("hive-site.xml"); + } + + // This parameter is retained for legacy reasons, in case someone implemented custom + // factories. This, however, should not be the case, since this api was intended to + // be internal-only, and we should manage the jms and json implementations without + // needing this parameter. Marking as deprecated, for removal by 2.4 - see corresponding + // note on the getDeserializer(String,String) method + @Deprecated + private static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl."; + + protected static final String MS_SERVER_URL = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.name(), ""); + protected static final String MS_SERVICE_PRINCIPAL = hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.name(), ""); + + /** + * Getter for MessageFactory instance. + */ + public static MessageFactory getInstance() { + if (instance == null) { + instance = + getInstance(hiveConf.get(HiveConf.ConfVars.METASTORE_EVENT_MESSAGE_FACTORY.varname)); + } + return instance; + } + + private static MessageFactory getInstance(String className) { + try { + return (MessageFactory)ReflectionUtils.newInstance(JavaUtils.loadClass(className), hiveConf); + } + catch (ClassNotFoundException classNotFound) { + throw new IllegalStateException("Could not construct MessageFactory implementation: ", classNotFound); + } + } + + /** + * Getter for MessageDeserializer, corresponding to the specified format and version. + * @param format Serialization format for notifications. + * @param version Version of serialization format (currently ignored.) + * @return MessageDeserializer. + */ + public static MessageDeserializer getDeserializer(String format, + String version) { + return getInstance(hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + format, + HiveConf.ConfVars.METASTORE_EVENT_MESSAGE_FACTORY.varname)).getDeserializer(); + // Note : The reason this method exists outside the no-arg getDeserializer method is in + // case there is a user-implemented MessageFactory that's used, and some the messages + // are in an older format and the rest in another. Then, what MessageFactory is default + // is irrelevant, we should always use the one that was used to create it to deserialize. + // + // There exist only 2 implementations of this - json and jms + // + // Additional note : rather than as a config parameter, does it make sense to have + // this use jdbc-like semantics that each MessageFactory made available register + // itself for discoverability? Might be worth pursuing. + } + + public abstract MessageDeserializer getDeserializer(); + + /** + * Getter for version-string, corresponding to all constructed messages. + */ + public abstract String getVersion(); + + /** + * Getter for message-format. + */ + public abstract String getMessageFormat(); + + /** + * Factory method for CreateDatabaseMessage. + * @param db The Database being added. + * @return CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db); + + /** + * Factory method for DropDatabaseMessage. + * @param db The Database being dropped. + * @return DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db); + + /** + * Factory method for CreateTableMessage. + * @param table The Table being created. + * @return CreateTableMessage instance. + */ + public abstract CreateTableMessage buildCreateTableMessage(Table table); + + /** + * Factory method for AlterTableMessage. Unlike most of these calls, this one can return null, + * which means no message should be sent. This is because there are many flavors of alter + * table (add column, add partition, etc.). Some are covered elsewhere (like add partition) + * and some are not yet supported. + * @param before The table before the alter + * @param after The table after the alter + * @return + */ + public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after); + + /** + * Factory method for DropTableMessage. + * @param table The Table being dropped. + * @return DropTableMessage instance. + */ + public abstract DropTableMessage buildDropTableMessage(Table table); + + /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partitions are added. + * @param partitions The iterator to set of Partitions being added. + * @return AddPartitionMessage instance. + */ + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitions); + + /** + * Factory method for building AlterPartitionMessage + * @param table The table in which the partition is being altered + * @param before The partition before it was altered + * @param after The partition after it was altered + * @return a new AlterPartitionMessage + */ + public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, + Partition after); + + /** + * Factory method for DropPartitionMessage. + * @param table The Table from which the partition is dropped. + * @param partitions The set of partitions being dropped. + * @return DropPartitionMessage instance. + */ + public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Iterator partitions); + + /** + * Factory method for building insert message + * @param db Name of the database the insert occurred in + * @param table Name of the table the insert occurred in + * @param partVals Partition values for the partition that the insert occurred in, may be null + * if the insert was done into a non-partitioned table + * @param files List of files created as a result of the insert, may be null. + * @return instance of InsertMessage + */ + public abstract InsertMessage buildInsertMessage(String db, String table, + Map partVals, List files); +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java new file mode 100644 index 0000000..8bc870a --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of AddPartitionMessage. + */ +public class JSONAddPartitionMessage extends AddPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableObjJson; + + @JsonProperty + Long timestamp; + + @JsonProperty + List> partitions; + + @JsonProperty + List partitionObjJson; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONAddPartitionMessage() { + } + + public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + public JSONAddPartitionMessage(String server, String servicePrincipal, Table tableObj, + Iterator partitionsIterator, Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), + JSONMessageFactory.getPartitionKeyValues(tableObj, partitionsIterator), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(partitionsIterator); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public String getTable() { + return table; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public List> getPartitions() { + return partitions; + } + + public String getTableObjJson() { + return tableObjJson; + } + + public List getPartitionObjJson() { + return partitionObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java new file mode 100644 index 0000000..4827234 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.Map; + +/** + * JSON alter table message + */ +public class JSONAlterPartitionMessage extends AlterPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableObjJson, partitionObjJson; + + @JsonProperty + Long timestamp; + + @JsonProperty + Map keyValues; + + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterPartitionMessage() { + } + + public JSONAlterPartitionMessage(String server, String servicePrincipal, String db, String table, + Map keyValues, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + this.keyValues = keyValues; + checkValid(); + } + + public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, + Partition partitionObjBefore, Partition partitionObjAfter, Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), + JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(partitionObjAfter); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + @Override + public Map getKeyValues() { + return keyValues; + } + + public String getTableObjJson() { + return tableObjJson; + } + + public String getPartitionObjJson() { + return partitionObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java new file mode 100644 index 0000000..c6e20ce --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON alter table message + */ +public class JSONAlterTableMessage extends AlterTableMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterTableMessage() { + } + + public JSONAlterTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObj, + Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + public String getTableObjJson() { + return tableObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java new file mode 100644 index 0000000..f8717b2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateDatabaseMessage. + */ +public class JSONCreateDatabaseMessage extends CreateDatabaseMessage { + + @JsonProperty + String server, servicePrincipal, db; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONCreateDatabaseMessage() {} + + public JSONCreateDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java new file mode 100644 index 0000000..aa737ca --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of CreateTableMessage. + */ +public class JSONCreateTableMessage extends CreateTableMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableObjJson; + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONCreateTableMessage() { + } + + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + public JSONCreateTableMessage(String server, String servicePrincipal, Table tableObj, + Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + public String getTableObjJson() { + return tableObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropDatabaseMessage.java new file mode 100644 index 0000000..be17e6d --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropDatabaseMessage.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of DropDatabaseMessage. + */ +public class JSONDropDatabaseMessage extends DropDatabaseMessage { + + @JsonProperty + String server, servicePrincipal, db; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONDropDatabaseMessage() {} + + public JSONDropDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.timestamp = timestamp; + checkValid(); + } + + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java new file mode 100644 index 0000000..b8ea224 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of DropPartitionMessage. + */ +public class JSONDropPartitionMessage extends DropPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List> partitions; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONDropPartitionMessage() { + } + + public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public String getTable() { + return table; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public List> getPartitions() { + return partitions; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java new file mode 100644 index 0000000..635ab61 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of DropTableMessage. + */ +public class JSONDropTableMessage extends DropTableMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONDropTableMessage() { + } + + public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getTable() { + return table; + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java new file mode 100644 index 0000000..ef89b17 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of DropTableMessage. + */ +public class JSONInsertMessage extends InsertMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List files; + + @JsonProperty + Map partKeyVals; + + /** + * Default constructor, needed for Jackson. + */ + public JSONInsertMessage() {} + + public JSONInsertMessage(String server, String servicePrincipal, String db, String table, + Map partKeyVals, List files, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + this.partKeyVals = partKeyVals; + this.files = files; + checkValid(); + } + + + @Override + public String getTable() { return table; } + + @Override + public String getServer() { return server; } + + @Override + public Map getPartitionKeyValues() { + return partKeyVals; + } + + @Override + public List getFiles() { + return files; + } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java new file mode 100644 index 0000000..84c392a --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * MessageDeserializer implementation, for deserializing from JSON strings. + */ +public class JSONMessageDeserializer extends MessageDeserializer { + + static ObjectMapper mapper = new ObjectMapper(); // Thread-safe. + + static { + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateDatabaseMessage.", exception); + } + } + + @Override + public DropDatabaseMessage getDropDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception); + } + } + + @Override + public CreateTableMessage getCreateTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateTableMessage.", exception); + } + } + + @Override + public AlterTableMessage getAlterTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct appropriate alter table type.", + exception); + } + } + + @Override + public DropTableMessage getDropTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropTableMessage.", exception); + } + } + + @Override + public AddPartitionMessage getAddPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAddPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct AddPartitionMessage.", exception); + } + } + + @Override + public AlterPartitionMessage getAlterPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterPartitionMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct AlterPartitionMessage.", e); + } + } + + @Override + public DropPartitionMessage getDropPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct DropPartitionMessage.", exception); + } + } + + @Override + public InsertMessage getInsertMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONInsertMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct InsertMessage", e); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java new file mode 100644 index 0000000..916dce2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * The JSON implementation of the MessageFactory. Constructs JSON implementations of each + * message-type. + */ +public class JSONMessageFactory extends MessageFactory { + + private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName()); + + private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); + + @Override + public MessageDeserializer getDeserializer() { + return deserializer; + } + + @Override + public String getVersion() { + return "0.1"; + } + + @Override + public String getMessageFormat() { + return "json"; + } + + @Override + public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { + return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), + now()); + } + + @Override + public DropDatabaseMessage buildDropDatabaseMessage(Database db) { + return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now()); + } + + @Override + public CreateTableMessage buildCreateTableMessage(Table table) { + return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, now()); + } + + @Override + public AlterTableMessage buildAlterTableMessage(Table before, Table after) { + return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, after, now()); + } + + @Override + public DropTableMessage buildDropTableMessage(Table table) { + return new JSONDropTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), now()); + } + + @Override + public AddPartitionMessage buildAddPartitionMessage(Table table, + Iterator partitionsIterator) { + return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, + partitionsIterator, now()); + } + + @Override + public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, + Partition after) { + return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, before, + after, now()); + } + + @Override + public DropPartitionMessage buildDropPartitionMessage(Table table, + Iterator partitionsIterator) { + return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), getPartitionKeyValues(table, partitionsIterator), now()); + } + + @Override + public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, + List files) { + return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, + files, now()); + } + + private long now() { + return System.currentTimeMillis() / 1000; + } + + static Map getPartitionKeyValues(Table table, Partition partition) { + Map partitionKeys = new LinkedHashMap(); + for (int i = 0; i < table.getPartitionKeysSize(); ++i) + partitionKeys.put(table.getPartitionKeys().get(i).getName(), partition.getValues().get(i)); + return partitionKeys; + } + + static List> getPartitionKeyValues(final Table table, + Iterator iterator) { + return Lists.newArrayList(Iterators.transform(iterator, + new Function>() { + @Override + public Map apply(@Nullable Partition partition) { + return getPartitionKeyValues(table, partition); + } + })); + } + + static String createTableObjJson(Table tableObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(tableObj, "UTF-8"); + } + + static String createPartitionObjJson(Partition partitionObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(partitionObj, "UTF-8"); + } + + static List createPartitionObjJson(Iterator partitionsIterator) + throws TException { + List partitionsJson = new ArrayList(); + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + while (partitionsIterator.hasNext()) { + partitionsJson.add(serializer.toString(partitionsIterator.next(), "UTF-8")); + } + return partitionsJson; + } +}