diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java index ac7dcd9..1a39794 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java @@ -19,9 +19,13 @@ package org.apache.hive.hcatalog.messaging.json; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -31,7 +35,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; @@ -39,13 +43,17 @@ @JsonProperty List> partitions; + @JsonProperty + List partitionObjJson; + /** * Default Constructor. Required for Jackson. */ - public JSONAddPartitionMessage() {} + public JSONAddPartitionMessage() { + } public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, - List> partitions, Long timestamp) { + List> partitions, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -55,30 +63,61 @@ public JSONAddPartitionMessage(String server, String servicePrincipal, String db checkValid(); } + public JSONAddPartitionMessage(String server, String servicePrincipal, Table tableObj, + Iterator partitionsIterator, Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), + JSONMessageFactory.getPartitionKeyValues(tableObj, partitionsIterator), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(partitionsIterator); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + @Override - public String getServer() { return server; } + public String getServer() { + return server; + } @Override - public String getServicePrincipal() { return servicePrincipal; } + public String getServicePrincipal() { + return servicePrincipal; + } @Override - public String getDB() { return db; } + public String getDB() { + return db; + } @Override - public String getTable() { return table; } + public String getTable() { + return table; + } @Override - public Long getTimestamp() { return timestamp; } + public Long getTimestamp() { + return timestamp; + } @Override - public List> getPartitions () { return partitions; } + public List> getPartitions() { + return partitions; + } + + public String getTableObjJson() { + return tableObjJson; + } + + public List getPartitionObjJson() { + return partitionObjJson; + } @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); - } - catch (Exception exception) { + } catch (Exception exception) { throw new IllegalArgumentException("Could not serialize: ", exception); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java index 4f1d104..58e60bf 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java @@ -18,41 +18,36 @@ */ package org.apache.hive.hcatalog.messaging.json; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hive.hcatalog.common.HCatConstants; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; -import org.apache.hive.hcatalog.messaging.AlterTableMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - /** * JSON alter table message */ public class JSONAlterPartitionMessage extends AlterPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson, partitionObjJson; @JsonProperty Long timestamp; @JsonProperty - Map keyValues; + Map keyValues; /** * Default constructor, needed for Jackson. */ - public JSONAlterPartitionMessage() {} - - public JSONAlterPartitionMessage(String server, - String servicePrincipal, - String db, - String table, - Map keyValues, - Long timestamp) { + public JSONAlterPartitionMessage() { + } + + public JSONAlterPartitionMessage(String server, String servicePrincipal, String db, String table, + Map keyValues, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -62,6 +57,17 @@ public JSONAlterPartitionMessage(String server, checkValid(); } + public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, + Partition partitionObjBefore, Partition partitionObjAfter, Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), + JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(partitionObjAfter); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } @Override public String getServer() { @@ -89,10 +95,18 @@ public String getTable() { } @Override - public Map getKeyValues() { + public Map getKeyValues() { return keyValues; } + public String getTableObjJson() { + return tableObjJson; + } + + public String getPartitionObjJson() { + return partitionObjJson; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java index b057d4a..4c1e9e2 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java @@ -18,21 +18,18 @@ */ package org.apache.hive.hcatalog.messaging.json; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.AlterTableMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; -import java.util.ArrayList; -import java.util.List; - /** * JSON alter table message */ public class JSONAlterTableMessage extends AlterTableMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; @@ -40,13 +37,11 @@ /** * Default constructor, needed for Jackson. */ - public JSONAlterTableMessage() {} + public JSONAlterTableMessage() { + } - public JSONAlterTableMessage(String server, - String servicePrincipal, - String db, - String table, - Long timestamp) { + public JSONAlterTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -55,6 +50,15 @@ public JSONAlterTableMessage(String server, checkValid(); } + public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObj, + Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } @Override public String getServer() { @@ -81,6 +85,10 @@ public String getTable() { return table; } + public String getTableObjJson() { + return tableObjJson; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java index 9c66730..dd26f80 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java @@ -19,7 +19,9 @@ package org.apache.hive.hcatalog.messaging.json; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.CreateTableMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; /** @@ -28,17 +30,18 @@ public class JSONCreateTableMessage extends CreateTableMessage { @JsonProperty - String server, servicePrincipal, db, table; - + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; /** * Default constructor, needed for Jackson. */ - public JSONCreateTableMessage() {} + public JSONCreateTableMessage() { + } - public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) { + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -47,27 +50,50 @@ public JSONCreateTableMessage(String server, String servicePrincipal, String db, checkValid(); } + public JSONCreateTableMessage(String server, String servicePrincipal, Table tableObj, + Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + @Override - public String getServer() { return server; } + public String getServer() { + return server; + } @Override - public String getServicePrincipal() { return servicePrincipal; } + public String getServicePrincipal() { + return servicePrincipal; + } @Override - public String getDB() { return db; } + public String getDB() { + return db; + } @Override - public Long getTimestamp() { return timestamp; } + public Long getTimestamp() { + return timestamp; + } @Override - public String getTable() { return table; } + public String getTable() { + return table; + } + + public String getTableObjJson() { + return tableObjJson; + } @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); - } - catch (Exception exception) { + } catch (Exception exception) { throw new IllegalArgumentException("Could not serialize: ", exception); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropPartitionMessage.java index a4d6400..de9e78b 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropPartitionMessage.java @@ -19,9 +19,13 @@ package org.apache.hive.hcatalog.messaging.json; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.DropPartitionMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -31,7 +35,7 @@ public class JSONDropPartitionMessage extends DropPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; @@ -39,13 +43,17 @@ @JsonProperty List> partitions; + @JsonProperty + List partitionObjJson; + /** * Default Constructor. Required for Jackson. */ - public JSONDropPartitionMessage() {} + public JSONDropPartitionMessage() { + } public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table, - List> partitions, Long timestamp) { + List> partitions, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -55,31 +63,61 @@ public JSONDropPartitionMessage(String server, String servicePrincipal, String d checkValid(); } + public JSONDropPartitionMessage(String server, String servicePrincipal, Table tableObj, + Iterator partitionsIterator, Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), + JSONMessageFactory.getPartitionKeyValues(tableObj, partitionsIterator), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(partitionsIterator); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } @Override - public String getServer() { return server; } + public String getServer() { + return server; + } @Override - public String getServicePrincipal() { return servicePrincipal; } + public String getServicePrincipal() { + return servicePrincipal; + } @Override - public String getDB() { return db; } + public String getDB() { + return db; + } @Override - public String getTable() { return table; } + public String getTable() { + return table; + } @Override - public Long getTimestamp() { return timestamp; } + public Long getTimestamp() { + return timestamp; + } @Override - public List> getPartitions () { return partitions; } + public List> getPartitions() { + return partitions; + } + + public String getTableObjJson() { + return tableObjJson; + } + + public List getPartitionObjJson() { + return partitionObjJson; + } @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); - } - catch (Exception exception) { + } catch (Exception exception) { throw new IllegalArgumentException("Could not serialize: ", exception); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropTableMessage.java index 3b62023..34972df 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropTableMessage.java @@ -19,7 +19,9 @@ package org.apache.hive.hcatalog.messaging.json; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.DropTableMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; /** @@ -28,7 +30,7 @@ public class JSONDropTableMessage extends DropTableMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; @@ -36,9 +38,11 @@ /** * Default constructor, needed for Jackson. */ - public JSONDropTableMessage() {} + public JSONDropTableMessage() { + } - public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) { + public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, + Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -47,28 +51,49 @@ public JSONDropTableMessage(String server, String servicePrincipal, String db, S checkValid(); } + public JSONDropTableMessage(String server, String servicePrincipal, Table tableObj, Long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } @Override - public String getTable() { return table; } + public String getTable() { + return table; + } @Override - public String getServer() { return server; } + public String getServer() { + return server; + } @Override - public String getServicePrincipal() { return servicePrincipal; } + public String getServicePrincipal() { + return servicePrincipal; + } @Override - public String getDB() { return db; } + public String getDB() { + return db; + } @Override - public Long getTimestamp() { return timestamp; } + public Long getTimestamp() { + return timestamp; + } + + public String getTableObjJson() { + return tableObjJson; + } @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); - } - catch (Exception exception) { + } catch (Exception exception) { throw new IllegalArgumentException("Could not serialize: ", exception); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java index 834fdde..be2320d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java @@ -19,7 +19,6 @@ package org.apache.hive.hcatalog.messaging.json; -import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterTableMessage; diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 6b74b54..8436557 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -19,11 +19,14 @@ package org.apache.hive.hcatalog.messaging.json; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -38,23 +41,24 @@ import org.apache.hive.hcatalog.messaging.InsertMessage; import org.apache.hive.hcatalog.messaging.MessageDeserializer; import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; /** - * The JSON implementation of the MessageFactory. Constructs JSON implementations of - * each message-type. + * The JSON implementation of the MessageFactory. Constructs JSON implementations of each + * message-type. */ public class JSONMessageFactory extends MessageFactory { private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName()); - private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); @Override @@ -80,49 +84,48 @@ public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { @Override public DropDatabaseMessage buildDropDatabaseMessage(Database db) { - return new JSONDropDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(), - now()); + return new JSONDropDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(), now()); } @Override public CreateTableMessage buildCreateTableMessage(Table table) { - return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), now()); + return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table, now()); } @Override public AlterTableMessage buildAlterTableMessage(Table before, Table after) { - return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), - before.getTableName(), now()); + return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, after, now()); } @Override public DropTableMessage buildDropTableMessage(Table table) { - return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), - now()); + return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table, now()); } @Override - public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitionsIterator) { - return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), getPartitionKeyValues(table, partitionsIterator), now()); + public AddPartitionMessage buildAddPartitionMessage(Table table, + Iterator partitionsIterator) { + return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table, + partitionsIterator, now()); } @Override - public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, Partition after) { - return new JSONAlterPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, - before.getDbName(), before.getTableName(), getPartitionKeyValues(table,before),now()); + public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, + Partition after) { + return new JSONAlterPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table, before, + after, now()); } @Override - public DropPartitionMessage buildDropPartitionMessage(Table table, Iterator partitions) { - return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), getPartitionKeyValues(table, partitions), now()); + public DropPartitionMessage buildDropPartitionMessage(Table table, + Iterator partitionsIterator) { + return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table, + partitionsIterator, now()); } @Override - public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, - List files) { + public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, + List files) { return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, partKeyVals, files, now()); } @@ -131,20 +134,41 @@ private long now() { return System.currentTimeMillis() / 1000; } - private static Map getPartitionKeyValues(Table table, Partition partition) { + static Map getPartitionKeyValues(Table table, Partition partition) { Map partitionKeys = new LinkedHashMap(); - for (int i=0; i> getPartitionKeyValues(final Table table, Iterator iterator) { - return Lists.newArrayList(Iterators.transform(iterator, new Function>() { - @Override - public Map apply(@Nullable Partition partition) { - return getPartitionKeyValues(table, partition); - } - })); + static List> getPartitionKeyValues(final Table table, + Iterator iterator) { + return Lists.newArrayList(Iterators.transform(iterator, + new Function>() { + @Override + public Map apply(@Nullable Partition partition) { + return getPartitionKeyValues(table, partition); + } + })); + } + + static String createTableObjJson(Table tableObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(tableObj, "UTF-8"); + } + + static String createPartitionObjJson(Partition partitionObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(partitionObj, "UTF-8"); + } + + static List createPartitionObjJson(Iterator partitionsIterator) + throws TException { + List partitionsJson = new ArrayList(); + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + while (partitionsIterator.hasNext()) { + partitionsJson.add(serializer.toString(partitionsIterator.next(), "UTF-8")); + } + return partitionsJson; } } diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 1cd32d5..cffd6d8 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -15,16 +15,21 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. -*/ + */ package org.apache.hive.hcatalog.listener; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -36,27 +41,32 @@ import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ObjectNode; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestDbNotificationListener { - private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class + .getName()); private static final int EVENTS_TTL = 30; private static final int CLEANUP_SLEEP_TIME = 10; private static Map emptyParameters = new HashMap(); @@ -71,12 +81,11 @@ public static void connectToMetastore() throws Exception { HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName()); - conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL)+"s"); + conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL) + "s"); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); - conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, - DummyRawStoreFailEvent.class.getName()); + conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName()); Class dbNotificationListener = Class.forName("org.apache.hive.hcatalog.listener.DbNotificationListener"); Class[] classes = dbNotificationListener.getDeclaredClasses(); @@ -87,8 +96,7 @@ public static void connectToMetastore() throws Exception { sleepTimeField.set(null, CLEANUP_SLEEP_TIME * 1000); } } - conf - .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); SessionState.start(new CliSessionState(conf)); msClient = new HiveMetaStoreClient(conf); @@ -99,8 +107,10 @@ public static void connectToMetastore() throws Exception { public void setup() throws Exception { long now = System.currentTimeMillis() / 1000; startTime = 0; - if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge"); - else startTime = (int) now; + if (now > Integer.MAX_VALUE) + fail("Bummer, time has fallen over the edge"); + else + startTime = (int) now; firstEventId = msClient.getCurrentNotificationEventId().getEventId(); DummyRawStoreFailEvent.setEventSucceed(true); } @@ -119,8 +129,9 @@ public void createDatabase() throws Exception { assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType()); assertEquals("mydb", event.getDbName()); assertNull(event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"timestamp\":[0-9]+}")); + assertTrue(event.getMessage().matches( + "\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"timestamp\":[0-9]+}")); DummyRawStoreFailEvent.setEventSucceed(false); db = new Database("mydb2", "no description", "file:/tmp", emptyParameters); @@ -135,6 +146,7 @@ public void createDatabase() throws Exception { } @Test + @SuppressWarnings("deprecation") public void dropDatabase() throws Exception { Database db = new Database("dropdb", "no description", "file:/tmp", emptyParameters); msClient.createDatabase(db); @@ -149,8 +161,9 @@ public void dropDatabase() throws Exception { assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType()); assertEquals("dropdb", event.getDbName()); assertNull(event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"dropdb\",\"timestamp\":[0-9]+}")); + assertTrue(event.getMessage().matches( + "\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"dropdb\",\"timestamp\":[0-9]+}")); db = new Database("dropdb", "no description", "file:/tmp", emptyParameters); msClient.createDatabase(db); @@ -169,11 +182,11 @@ public void dropDatabase() throws Exception { public void createTable() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); - SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", new HashMap()); + StorageDescriptor sd = getStorageDescriptorForTest(cols, serde); + Table table = + new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, emptyParameters, + null, null, null); msClient.createTable(table); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); @@ -185,11 +198,21 @@ public void createTable() throws Exception { assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("mytable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}")); - table = new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + // Parse the message field + ObjectNode jsonTree = getJsonTree(event); + assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("mytable", jsonTree.get("table").asText()); + Table tableObj = getTableObj(jsonTree); + assertEquals("default", tableObj.getDbName()); + assertEquals("mytable", tableObj.getTableName()); + assertEquals("me", tableObj.getOwner()); + assertEquals(sd, tableObj.getSd()); + + table = + new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, emptyParameters, + null, null, null); DummyRawStoreFailEvent.setEventSucceed(false); try { msClient.createTable(table); @@ -205,19 +228,23 @@ public void createTable() throws Exception { public void alterTable() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); - SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd, - new ArrayList(), emptyParameters, null, null, null); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", new HashMap()); + StorageDescriptor sd = getStorageDescriptorForTest(cols, serde); + Table table = + new Table("alttable", "default", "me", startTime, startTime, 0, sd, + new ArrayList(), emptyParameters, null, null, null); + // Event 1 msClient.createTable(table); - + // Add a column to sd cols.add(new FieldSchema("col2", "int", "")); - table = new Table("alttable", "default", "me", startTime, startTime, 0, sd, - new ArrayList(), emptyParameters, null, null, null); + table = + new Table("alttable", "default", "me", startTime, startTime, 0, sd, + new ArrayList(), emptyParameters, null, null, null); + // Event 2 msClient.alter_table("default", "alttable", table); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + // We should get 2 events assertEquals(2, rsp.getEventsSize()); NotificationEvent event = rsp.getEvents().get(1); @@ -226,9 +253,17 @@ public void alterTable() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alttable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," + - "\"timestamp\":[0-9]+}")); + + // Parse the message field of the event + JsonNode jsonTree = getJsonTree(event); + assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("alttable", jsonTree.get("table").asText()); + Table tableObj = getTableObj(jsonTree); + assertEquals("default", tableObj.getDbName()); + assertEquals("alttable", tableObj.getTableName()); + assertEquals("me", tableObj.getOwner()); + assertEquals(table.getSd(), tableObj.getSd()); DummyRawStoreFailEvent.setEventSucceed(false); try { @@ -245,12 +280,14 @@ public void alterTable() throws Exception { public void dropTable() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); - SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("droptable", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", new HashMap()); + StorageDescriptor sd = getStorageDescriptorForTest(cols, serde); + Table table = + new Table("droptable", "default", "me", startTime, startTime, 0, sd, + new ArrayList(), emptyParameters, null, null, null); + // Event 1 msClient.createTable(table); + // Event 2 msClient.dropTable("default", "droptable"); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); @@ -262,12 +299,21 @@ public void dropTable() throws Exception { assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("droptable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"droptable\",\"timestamp\":[0-9]+}")); - table = new Table("droptable2", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + // Parse the message field + JsonNode jsonTree = getJsonTree(event); + assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, jsonTree.get("eventType").asText()); + assertEquals("default", jsonTree.get("db").asText()); + assertEquals("droptable", jsonTree.get("table").asText()); + Table tableObj = getTableObj(jsonTree); + assertEquals("default", tableObj.getDbName()); + assertEquals("droptable", tableObj.getTableName()); + assertEquals("me", tableObj.getOwner()); + assertEquals(table.getSd(), tableObj.getSd()); + + table = + new Table("droptable2", "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); msClient.createTable(table); DummyRawStoreFailEvent.setEventSucceed(false); try { @@ -287,14 +333,17 @@ public void addPartition() throws Exception { List partCols = new ArrayList(); partCols.add(new FieldSchema("ds", "string", "")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("addPartTable", "default", "me", startTime, startTime, 0, sd, partCols, - emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("addPartTable", "default", "me", startTime, startTime, 0, sd, partCols, + emptyParameters, null, null, null); msClient.createTable(table); - Partition partition = new Partition(Arrays.asList("today"), "default", "addPartTable", - startTime, startTime, sd, emptyParameters); + Partition partition = + new Partition(Arrays.asList("today"), "default", "addPartTable", startTime, startTime, sd, + emptyParameters); msClient.add_partition(partition); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); @@ -306,12 +355,10 @@ public void addPartition() throws Exception { assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("addparttable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"addparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}")); - partition = new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist", - startTime, startTime, sd, emptyParameters); + partition = + new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist", startTime, + startTime, sd, emptyParameters); DummyRawStoreFailEvent.setEventSucceed(false); try { msClient.add_partition(partition); @@ -330,18 +377,22 @@ public void alterPartition() throws Exception { List partCols = new ArrayList(); partCols.add(new FieldSchema("ds", "string", "")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("alterparttable", "default", "me", startTime, startTime, 0, sd, - partCols, emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("alterparttable", "default", "me", startTime, startTime, 0, sd, partCols, + emptyParameters, null, null, null); msClient.createTable(table); - Partition partition = new Partition(Arrays.asList("today"), "default", "alterparttable", - startTime, startTime, sd, emptyParameters); + Partition partition = + new Partition(Arrays.asList("today"), "default", "alterparttable", startTime, startTime, + sd, emptyParameters); msClient.add_partition(partition); - Partition newPart = new Partition(Arrays.asList("today"), "default", "alterparttable", - startTime, startTime + 1, sd, emptyParameters); + Partition newPart = + new Partition(Arrays.asList("today"), "default", "alterparttable", startTime, + startTime + 1, sd, emptyParameters); msClient.alter_partition("default", "alterparttable", newPart, null); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); @@ -352,10 +403,6 @@ public void alterPartition() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alterparttable", event.getTableName()); - assertTrue(event.getMessage(), - event.getMessage().matches("\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," + - "\"timestamp\":[0-9]+,\"keyValues\":\\{\"ds\":\"today\"}}")); DummyRawStoreFailEvent.setEventSucceed(false); try { @@ -375,14 +422,17 @@ public void dropPartition() throws Exception { List partCols = new ArrayList(); partCols.add(new FieldSchema("ds", "string", "")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("dropPartTable", "default", "me", startTime, startTime, 0, sd, partCols, - emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("dropPartTable", "default", "me", startTime, startTime, 0, sd, partCols, + emptyParameters, null, null, null); msClient.createTable(table); - Partition partition = new Partition(Arrays.asList("today"), "default", "dropPartTable", - startTime, startTime, sd, emptyParameters); + Partition partition = + new Partition(Arrays.asList("today"), "default", "dropPartTable", startTime, startTime, sd, + emptyParameters); msClient.add_partition(partition); msClient.dropPartition("default", "dropparttable", Arrays.asList("today"), false); @@ -396,12 +446,10 @@ public void dropPartition() throws Exception { assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("dropparttable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"dropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}")); - partition = new Partition(Arrays.asList("tomorrow"), "default", "dropPartTable", - startTime, startTime, sd, emptyParameters); + partition = + new Partition(Arrays.asList("tomorrow"), "default", "dropPartTable", startTime, startTime, + sd, emptyParameters); msClient.add_partition(partition); DummyRawStoreFailEvent.setEventSucceed(false); try { @@ -419,10 +467,12 @@ public void insertTable() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("insertTable", "default", "me", startTime, startTime, 0, sd, null, - emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("insertTable", "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); msClient.createTable(table); FireEventRequestData data = new FireEventRequestData(); @@ -443,11 +493,11 @@ public void insertTable() throws Exception { assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("insertTable", event.getTableName()); - assertTrue(event.getMessage(), - event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"insertTable\",\"timestamp\":[0-9]+,\"files\":\\[\"/warehouse/mytable/b1\"]," + - "\"partKeyVals\":\\{},\"partitionKeyValues\":\\{}}")); + assertTrue(event.getMessage(), event.getMessage().matches( + "\\{\"eventType\":\"INSERT\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"insertTable\",\"timestamp\":[0-9]+,\"files\":\\[\"/warehouse/mytable/b1\"]," + + "\"partKeyVals\":\\{},\"partitionKeyValues\":\\{}}")); } @Test @@ -457,13 +507,16 @@ public void insertPartition() throws Exception { List partCols = new ArrayList(); partCols.add(new FieldSchema("ds", "string", "")); SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, - serde, null, null, emptyParameters); - Table table = new Table("insertPartition", "default", "me", startTime, startTime, 0, sd, - partCols, emptyParameters, null, null, null); + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + Table table = + new Table("insertPartition", "default", "me", startTime, startTime, 0, sd, partCols, + emptyParameters, null, null, null); msClient.createTable(table); - Partition partition = new Partition(Arrays.asList("today"), "default", "insertPartition", - startTime, startTime, sd, emptyParameters); + Partition partition = + new Partition(Arrays.asList("today"), "default", "insertPartition", startTime, startTime, + sd, emptyParameters); msClient.add_partition(partition); FireEventRequestData data = new FireEventRequestData(); @@ -485,12 +538,12 @@ public void insertPartition() throws Exception { assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("insertPartition", event.getTableName()); - assertTrue(event.getMessage(), - event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"insertPartition\",\"timestamp\":[0-9]+," + - "\"files\":\\[\"/warehouse/mytable/today/b1\"],\"partKeyVals\":\\{\"ds\":\"today\"}," + - "\"partitionKeyValues\":\\{\"ds\":\"today\"}}")); + assertTrue(event.getMessage(), event.getMessage().matches( + "\\{\"eventType\":\"INSERT\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"insertPartition\",\"timestamp\":[0-9]+," + + "\"files\":\\[\"/warehouse/mytable/today/b1\"],\"partKeyVals\":\\{\"ds\":\"today\"}," + + "\"partitionKeyValues\":\\{\"ds\":\"today\"}}")); } @Test @@ -558,7 +611,7 @@ public void sqlInsertTable() throws Exception { NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); // For reasons not clear to me there's an alter after the create table and one after the - // insert. I think the one after the insert is a stats calculation. + // insert. I think the one after the insert is a stats calculation. assertEquals(6, rsp.getEventsSize()); NotificationEvent event = rsp.getEvents().get(0); assertEquals(firstEventId + 1, event.getEventId()); @@ -642,11 +695,13 @@ public void sqlInsertPartition() throws Exception { driver.run("alter table sip drop partition (ds = 'tomorrow')"); driver.run("insert into table sip partition (ds) values (42, 'todaytwo')"); - driver.run("insert overwrite table sip partition(ds='todaytwo') select c from sip where 'ds'='today'"); + driver + .run("insert overwrite table sip partition(ds='todaytwo') select c from sip where 'ds'='today'"); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); - for (NotificationEvent ne : rsp.getEvents()) LOG.debug("EVENT: " + ne.getMessage()); + for (NotificationEvent ne : rsp.getEvents()) + LOG.debug("EVENT: " + ne.getMessage()); // For reasons not clear to me there's one or more alter partitions after add partition and // insert. assertEquals(24, rsp.getEventsSize()); @@ -691,7 +746,8 @@ public void sqlInsertPartition() throws Exception { event = rsp.getEvents().get(21); assertEquals(firstEventId + 22, event.getEventId()); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); - assertTrue(event.getMessage().matches(".*\"files\":\\[\\].*")); // replace-overwrite introduces no new files + assertTrue(event.getMessage().matches(".*\"files\":\\[\\].*")); // replace-overwrite introduces + // no new files event = rsp.getEvents().get(22); assertEquals(firstEventId + 23, event.getEventId()); assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); @@ -701,11 +757,11 @@ public void sqlInsertPartition() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); - } + } @Test public void cleanupNotifs() throws Exception { - Database db = new Database("cleanup1","no description","file:/tmp", emptyParameters); + Database db = new Database("cleanup1", "no description", "file:/tmp", emptyParameters); msClient.createDatabase(db); msClient.dropDatabase("cleanup1"); @@ -714,11 +770,41 @@ public void cleanupNotifs() throws Exception { assertEquals(2, rsp.getEventsSize()); // sleep for expiry time, and then fetch again - Thread.sleep(EVENTS_TTL * 2 * 1000); // sleep twice the TTL interval - things should have been cleaned by then. + Thread.sleep(EVENTS_TTL * 2 * 1000); // sleep twice the TTL interval - things should have been + // cleaned by then. LOG.info("Pulling events again after cleanup"); NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null); LOG.info("second trigger done"); assertEquals(0, rsp2.getEventsSize()); } + + private Table getTableObj(JsonNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Table tableObj = new Table(); + String tableJson = jsonTree.get("tableObjJson").asText(); + deSerializer.deserialize(tableObj, tableJson, "UTF-8"); + return tableObj; + } + + private ObjectNode getJsonTree(NotificationEvent event) throws Exception { + JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonParser, ObjectNode.class); + } + + private StorageDescriptor getStorageDescriptorForTest(List cols, SerDeInfo serde) { + StorageDescriptor sd = + new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null, + emptyParameters); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + SkewedInfo si = new SkewedInfo(); + si.setSkewedColNames(new ArrayList()); + si.setSkewedColValues(new ArrayList>()); + si.setSkewedColValueLocationMaps(new HashMap, String>()); + sd.setSkewedInfo(si); + sd.setStoredAsSubDirectories(false); + return sd; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index a4dfa3a..3b3cd7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -74,16 +74,19 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump"); initReplDump(ast); analyzeReplDump(ast); + break; } case TOK_REPL_LOAD: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load"); initReplLoad(ast); analyzeReplLoad(ast); + break; } case TOK_REPL_STATUS: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status"); initReplStatus(ast); analyzeReplStatus(ast); + break; } default: { throw new SemanticException("Unexpected root token");