diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java index b057d4a..c13a263 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java @@ -18,21 +18,20 @@ */ package org.apache.hive.hcatalog.messaging.json; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hive.hcatalog.common.HCatConstants; +import java.io.IOException; + +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.AlterTableMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; -import java.util.ArrayList; -import java.util.List; - /** * JSON alter table message */ public class JSONAlterTableMessage extends AlterTableMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; @@ -40,13 +39,11 @@ /** * Default constructor, needed for Jackson. */ - public JSONAlterTableMessage() {} + public JSONAlterTableMessage() { + } - public JSONAlterTableMessage(String server, - String servicePrincipal, - String db, - String table, - Long timestamp) { + public JSONAlterTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -55,6 +52,15 @@ public JSONAlterTableMessage(String server, checkValid(); } + public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObj, + Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (IOException | TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } @Override public String getServer() { @@ -81,6 +87,10 @@ public String getTable() { return table; } + public String getTableObjJson() { + return tableObjJson; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java index 9c66730..0f6b945 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java @@ -19,7 +19,11 @@ package org.apache.hive.hcatalog.messaging.json; +import java.io.IOException; + +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.CreateTableMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; /** @@ -28,17 +32,18 @@ public class JSONCreateTableMessage extends CreateTableMessage { @JsonProperty - String server, servicePrincipal, db, table; - + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; /** * Default constructor, needed for Jackson. */ - public JSONCreateTableMessage() {} + public JSONCreateTableMessage() { + } - public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) { + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -47,27 +52,50 @@ public JSONCreateTableMessage(String server, String servicePrincipal, String db, checkValid(); } + public JSONCreateTableMessage(String server, String servicePrincipal, Table tableObj, + Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (IOException | TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + @Override - public String getServer() { return server; } + public String getServer() { + return server; + } @Override - public String getServicePrincipal() { return servicePrincipal; } + public String getServicePrincipal() { + return servicePrincipal; + } @Override - public String getDB() { return db; } + public String getDB() { + return db; + } @Override - public Long getTimestamp() { return timestamp; } + public Long getTimestamp() { + return timestamp; + } @Override - public String getTable() { return table; } + public String getTable() { + return table; + } + + public String getTableObjJson() { + return tableObjJson; + } @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); - } - catch (Exception exception) { + } catch (Exception exception) { throw new IllegalArgumentException("Could not serialize: ", exception); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropTableMessage.java index 3b62023..2e5bc22 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropTableMessage.java @@ -19,7 +19,11 @@ package org.apache.hive.hcatalog.messaging.json; +import java.io.IOException; + +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.DropTableMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; /** @@ -28,7 +32,7 @@ public class JSONDropTableMessage extends DropTableMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; @@ -36,9 +40,11 @@ /** * Default constructor, needed for Jackson. */ - public JSONDropTableMessage() {} + public JSONDropTableMessage() { + } - public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) { + public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -47,28 +53,49 @@ public JSONDropTableMessage(String server, String servicePrincipal, String db, S checkValid(); } + public JSONDropTableMessage(String server, String servicePrincipal, Table tableObj, Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (IOException | TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } @Override - public String getTable() { return table; } + public String getTable() { + return table; + } @Override - public String getServer() { return server; } + public String getServer() { + return server; + } @Override - public String getServicePrincipal() { return servicePrincipal; } + public String getServicePrincipal() { + return servicePrincipal; + } @Override - public String getDB() { return db; } + public String getDB() { + return db; + } @Override - public Long getTimestamp() { return timestamp; } + public Long getTimestamp() { + return timestamp; + } + + public String getTableObjJson() { + return tableObjJson; + } @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); - } - catch (Exception exception) { + } catch (Exception exception) { throw new IllegalArgumentException("Could not serialize: ", exception); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java index 834fdde..be2320d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java @@ -19,7 +19,6 @@ package org.apache.hive.hcatalog.messaging.json; -import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterTableMessage; diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 6b74b54..9101c47 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.metastore.api.Database; @@ -38,9 +39,13 @@ import org.apache.hive.hcatalog.messaging.InsertMessage; import org.apache.hive.hcatalog.messaging.MessageDeserializer; import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; import javax.annotation.Nullable; -import java.util.Arrays; + +import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -86,20 +91,17 @@ public DropDatabaseMessage buildDropDatabaseMessage(Database db) { @Override public CreateTableMessage buildCreateTableMessage(Table table) { - return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), now()); + return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table, now()); } @Override public AlterTableMessage buildAlterTableMessage(Table before, Table after) { - return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), - before.getTableName(), now()); + return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before, now()); } @Override public DropTableMessage buildDropTableMessage(Table table) { - return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), - now()); + return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table, now()); } @Override @@ -127,6 +129,11 @@ public InsertMessage buildInsertMessage(String db, String table, Map cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, emptyParameters, + null, null, null); msClient.createTable(table); - NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(0); assertEquals(firstEventId + 1, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("mytable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}")); + + // Parse the message field + JsonNode jsonTree = getJsonTree(event); + assertEquals(jsonTree.get("eventType").asText(), HCatConstants.HCAT_CREATE_TABLE_EVENT); + assertEquals(jsonTree.get("db").asText(), "default"); + assertEquals(jsonTree.get("table").asText(), "mytable"); + Table tableObj = getTableObj(jsonTree); + assertEquals(tableObj.getDbName(), "default"); + assertEquals(tableObj.getTableName(), "mytable"); + assertEquals(tableObj.getOwner(), "me"); + assertEquals(tableObj.getSd().compareTo(sd), 0); } @Test @@ -189,9 +208,17 @@ public void alterTable() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alttable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," + - "\"timestamp\":[0-9]+}")); + + // Parse the message field + JsonNode jsonTree = getJsonTree(event); + assertEquals(jsonTree.get("eventType").asText(), HCatConstants.HCAT_ALTER_TABLE_EVENT); + assertEquals(jsonTree.get("db").asText(), "default"); + assertEquals(jsonTree.get("table").asText(), "alttable"); + Table tableObj = getTableObj(jsonTree); + assertEquals(tableObj.getDbName(), "default"); + assertEquals(tableObj.getTableName(), "alttable"); + assertEquals(tableObj.getOwner(), "me"); + assertEquals(tableObj.getSd().compareTo(sd), 0); } @Test @@ -199,25 +226,33 @@ public void dropTable() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("droptable", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("droptable", "default", "me", startTime, startTime, 0, sd, null, emptyParameters, + null, null, null); msClient.createTable(table); msClient.dropTable("default", "droptable"); - NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(2, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(1); assertEquals(firstEventId + 2, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("droptable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"droptable\",\"timestamp\":[0-9]+}")); + + // Parse the message field + JsonNode jsonTree = getJsonTree(event); + assertEquals(jsonTree.get("eventType").asText(), HCatConstants.HCAT_DROP_TABLE_EVENT); + assertEquals(jsonTree.get("db").asText(), "default"); + assertEquals(jsonTree.get("table").asText(), "droptable"); + Table tableObj = getTableObj(jsonTree); + assertEquals(tableObj.getDbName(), "default"); + assertEquals(tableObj.getTableName(), "droptable"); + assertEquals(tableObj.getOwner(), "me"); + assertEquals(tableObj.getSd().compareTo(sd), 0); } @Test @@ -627,4 +662,18 @@ public void cleanupNotifs() throws Exception { LOG.info("second trigger done"); assertEquals(0, rsp2.getEventsSize()); } + + private Table getTableObj(JsonNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Table tableObj = new Table(); + String tableJson = jsonTree.get("tableObjJson").asText(); + deSerializer.deserialize(tableObj, tableJson, "UTF-8"); + return tableObj; + } + + private JsonNode getJsonTree(NotificationEvent event) throws Exception { + JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonParser, JsonNode.class); + } }