diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 5961388..2dbcbe5 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -19,10 +19,13 @@ package org.apache.hive.hcatalog.messaging; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.messaging.json.JSONMessageFactory; @@ -131,6 +134,16 @@ public static MessageDeserializer getDeserializer(String format, public abstract AddPartitionMessage buildAddPartitionMessage(Table table, List partitions); /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partitions are added. + * @param partitionSpec The set of Partitions being added. + * @return AddPartitionMessage instance. + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec); + + /** * Factory method for DropPartitionMessage. * @param table The Table from which the partition is dropped. * @param partition The Partition being dropped. diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 052a7ec..8a95890 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -19,9 +19,12 @@ package org.apache.hive.hcatalog.messaging.json; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage; import org.apache.hive.hcatalog.messaging.CreateTableMessage; @@ -87,6 +90,14 @@ public AddPartitionMessage buildAddPartitionMessage(Table table, List } @Override + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec) { + return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), getPartitionKeyValues(table, partitionSpec), System.currentTimeMillis()/1000); + } + + @Override public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) { return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(), partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)), @@ -107,4 +118,16 @@ public DropPartitionMessage buildDropPartitionMessage(Table table, Partition par partitionList.add(getPartitionKeyValues(table, partition)); return partitionList; } + + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + private static List> getPartitionKeyValues(Table table, PartitionSpecProxy partitionSpec) { + List> partitionList = new ArrayList>(); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition partition = iterator.next(); + partitionList.add(getPartitionKeyValues(table, partition)); + } + return partitionList; + } } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java index ccad819..4dee79b 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java @@ -22,6 +22,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hive.hcatalog.common.HCatException; @@ -213,6 +215,26 @@ public abstract void updateTableSchema(String dbName, String tableName, HCatTabl public abstract List deserializePartitions(List hcatPartitionStringReps) throws HCatException; /** + * Serializer for HCatPartitionSpec. + * @param partitionSpec HCatPartitionSpec to be serialized. + * @return A list of Strings, representing the HCatPartitionSpec as a whole. + * @throws HCatException On failure to serialize. + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract List serializePartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException; + + /** + * Deserializer for HCatPartitionSpec. + * @param hcatPartitionSpecStrings List of strings, representing the HCatPartitionSpec as a whole. + * @return HCatPartitionSpec, reconstructed from the list of strings. + * @throws HCatException On failure to deserialize. + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract HCatPartitionSpec deserializePartitionSpec(List hcatPartitionSpecStrings) throws HCatException; + + /** * Creates the table like an existing table. * * @param dbName The name of the database. @@ -280,6 +302,21 @@ public abstract void renameTable(String dbName, String oldName, throws HCatException; /** + * Gets partitions in terms of generic HCatPartitionSpec instances. + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, int maxPartitions) throws HCatException; + + /** + * Gets partitions in terms of generic HCatPartitionSpec instances. + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, Map partitionSelector, int maxPartitions) + throws HCatException; + + /** * Gets the partition. * * @param dbName The database name. @@ -312,6 +349,17 @@ public abstract int addPartitions(List partInfoList) throws HCatException; /** + * Adds partitions using HCatPartitionSpec. + * @param partitionSpec The HCatPartitionSpec representing the set of partitions added. + * @return The number of partitions added. + * @throws HCatException On failure to add partitions. + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract int addPartitionSpec(HCatPartitionSpec partitionSpec) + throws HCatException; + + /** * Drops partition(s) that match the specified (and possibly partial) partition specification. * A partial partition-specification is one where not all partition-keys have associated values. For example, * for a table ('myDb.myTable') with 2 partition keys (dt string, region string), @@ -344,6 +392,14 @@ public abstract void dropPartitions(String dbName, String tableName, String filter) throws HCatException; /** + * List partitions by filter, but as HCatPartitionSpecs. + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract HCatPartitionSpec listPartitionSpecsByFilter(String dbName, String tblName, + String filter, int maxPartitions) throws HCatException; + + /** * Mark partition for event. * * @param dbName The database name. diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java index 41cb44e..7e81113 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java @@ -25,6 +25,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -353,6 +355,31 @@ public void renameTable(String dbName, String oldName, String newName) return listPartitionsByFilter(dbName, tblName, getFilterString(partitionSpec)); } + @Override + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, int maxPartitions) throws HCatException { + try { + return new HCatPartitionSpec(getTable(dbName, tableName), + hmsClient.listPartitionSpecs(dbName, tableName, maxPartitions)); + } + catch (NoSuchObjectException e) { + throw new ObjectNotFoundException( + "NoSuchObjectException while retrieving partition.", e); + } catch (MetaException e) { + throw new HCatException( + "MetaException while retrieving partition.", e); + } catch (TException e) { + throw new ConnectionFailureException( + "TException while retrieving partition.", e); + } + } + + @Override + public HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, Map partitionSelector, int maxPartitions) throws HCatException { + return listPartitionSpecsByFilter(dbName, tableName, getFilterString(partitionSelector), maxPartitions); + } + private static String getFilterString(Map partitionSpec) { final String AND = " AND "; @@ -413,7 +440,7 @@ public void addPartition(HCatAddPartitionDesc partInfo) Table tbl = null; try { tbl = hmsClient.getTable(partInfo.getDatabaseName(), - partInfo.getTableName()); + partInfo.getTableName()); // TODO: Should be moved out. if (tbl.getPartitionKeysSize() == 0) { throw new HCatException("The table " + partInfo.getTableName() @@ -511,6 +538,28 @@ private void dropPartition(Partition partition, boolean ifExists) } @Override + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public HCatPartitionSpec listPartitionSpecsByFilter(String dbName, String tblName, String filter, int maxPartitions) + throws HCatException { + try { + return new HCatPartitionSpec(getTable(dbName, tblName), + hmsClient.listPartitionSpecsByFilter(dbName, tblName, filter, maxPartitions)); + } + catch(MetaException e) { + throw new HCatException("MetaException while fetching partitions.", e); + } + catch (NoSuchObjectException e) { + throw new ObjectNotFoundException( + "NoSuchObjectException while fetching partitions.", e); + } + catch (TException e) { + throw new ConnectionFailureException( + "TException while fetching partitions.", e); + } + } + + @Override public void markPartitionForEvent(String dbName, String tblName, Map partKVs, PartitionEventType eventType) throws HCatException { @@ -572,7 +621,7 @@ public String getDelegationToken(String owner, String token = null; try { token = hmsClient.getDelegationToken(owner, - renewerKerberosPrincipalName); + renewerKerberosPrincipalName); } catch (MetaException e) { throw new HCatException( "MetaException while getting delegation token.", e); @@ -750,6 +799,30 @@ public int addPartitions(List partInfoList) } @Override + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public int addPartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException { + + try { + return hmsClient.add_partitions_pspec(partitionSpec.toPartitionSpecProxy()); + } catch (InvalidObjectException e) { + throw new HCatException( + "InvalidObjectException while adding partition.", e); + } catch (AlreadyExistsException e) { + throw new HCatException( + "AlreadyExistsException while adding partition.", e); + } catch (MetaException e) { + throw new HCatException("MetaException while adding partition.", e); + } catch (NoSuchObjectException e) { + throw new ObjectNotFoundException("The table " + + "could not be found.", e); + } catch (TException e) { + throw new ConnectionFailureException( + "TException while adding partition.", e); + } + } + + @Override public String getMessageBusTopicName(String dbName, String tableName) throws HCatException { try { return hmsClient.getTable(dbName, tableName).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); @@ -824,4 +897,16 @@ public HCatPartition deserializePartition(String hcatPartitionStringRep) throws } return partitions; } + + @Override + public List serializePartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException { + return MetadataSerializer.get().serializePartitionSpec(partitionSpec); + } + + @Override + public HCatPartitionSpec deserializePartitionSpec(List hcatPartitionSpecStrings) throws HCatException { + HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get().deserializePartitionSpec(hcatPartitionSpecStrings); + hcatPartitionSpec.hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName())); + return hcatPartitionSpec; + } } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java new file mode 100644 index 0000000..d55cebd --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hive.hcatalog.common.HCatException; + +/** + * Generalized representation of a set of HCatPartitions. + */ + +@InterfaceAudience.LimitedPrivate({"Hive"}) +@InterfaceStability.Evolving +public class HCatPartitionSpec { + + protected HCatTable hcatTable; + protected PartitionSpecProxy partitionSpecProxy; + + protected HCatPartitionSpec(HCatTable hcatTable, PartitionSpecProxy partitionSpecProxy) throws HCatException { + this.hcatTable = hcatTable; + this.partitionSpecProxy = partitionSpecProxy; + assert_invariant(); + } + + /** + * Getter for DBName of this HCatPartitionSpec. + * @return The name of the DB. + */ + public String getDbName() { + return partitionSpecProxy.getDbName(); + } + + /** + * Getter for TableName of this HCatPartitionSpec. + * @return The name of the TableName. + */ + public String getTableName() { + return partitionSpecProxy.getTableName(); + } + + /** + * Setter for HCatTable. Required for deserialization. + */ + void hcatTable(HCatTable hcatTable) throws HCatException { + + assert this.hcatTable == null : "Expected hcatTable to be null at this point."; + this.hcatTable = hcatTable; + assert_invariant(); + + } + + /** + * Conversion to a Hive Metastore API PartitionSpecProxy instance. + */ + PartitionSpecProxy toPartitionSpecProxy() { + return partitionSpecProxy; + } + + /** + * Getter for the number of HCatPartitions represented by this HCatPartitionSpec instance. + * @return The number of HCatPartitions. + * @throws HCatException On failure. + */ + public int size() throws HCatException { + return partitionSpecProxy.size(); + } + + /** + * Setter for the "root" location of the HCatPartitionSpec. + * @param location The new "root" location of the HCatPartitionSpec. + * @throws HCatException On failure to set a new location. + */ + public void setRootLocation(String location) throws HCatException { + try { + partitionSpecProxy.setRootLocation(location); + } + catch (MetaException metaException) { + throw new HCatException("Unable to set root-path!", metaException); + } + } + + /** + * Getter for an Iterator to the first HCatPartition in the HCatPartitionSpec. + * @return HCatPartitionIterator to the first HCatPartition. + */ + public HCatPartitionIterator getPartitionIterator() { + return new HCatPartitionIterator(hcatTable, partitionSpecProxy.getPartitionIterator()); + } + + // Assert class invariant. + private void assert_invariant() throws HCatException { + + if (hcatTable != null) { + + if (!hcatTable.getDbName().equalsIgnoreCase(partitionSpecProxy.getDbName())) { + String errorMessage = "Invalid HCatPartitionSpec instance: Table's DBName (" + hcatTable.getDbName() + ") " + + "doesn't match PartitionSpec (" + partitionSpecProxy.getDbName() + ")"; + assert false : errorMessage; + throw new HCatException(errorMessage); + } + + if (!hcatTable.getTableName().equalsIgnoreCase(partitionSpecProxy.getTableName())) { + String errorMessage = "Invalid HCatPartitionSpec instance: Table's TableName (" + hcatTable.getTableName() + ") " + + "doesn't match PartitionSpec (" + partitionSpecProxy.getTableName() + ")"; + assert false : errorMessage; + throw new HCatException(errorMessage); + } + } + } + + + /** + * Iterator over HCatPartitions in the HCatPartitionSpec. + */ + public static class HCatPartitionIterator { // implements java.util.Iterator { + + private HCatTable hcatTable; + private PartitionSpecProxy.PartitionIterator iterator; + + HCatPartitionIterator(HCatTable hcatTable, PartitionSpecProxy.PartitionIterator iterator) { + this.hcatTable = hcatTable; + this.iterator = iterator; + } + + public boolean hasNext() { + return iterator.hasNext(); + } + + public HCatPartition next() throws HCatException { + return new HCatPartition(hcatTable, iterator.next()); + } + + public void remove() { + iterator.remove(); + } + + } // class HCatPartitionIterator; + +} // class HCatPartitionSpec; diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java index 30ac00f..d98cea5 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java @@ -1,7 +1,11 @@ package org.apache.hive.hcatalog.api; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hive.hcatalog.common.HCatException; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -10,6 +14,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + /** * MetadataSerializer implementation, that serializes HCat API elements into JSON. */ @@ -68,4 +75,38 @@ public HCatPartition deserializePartition(String hcatPartitionStringRep) throws throw new HCatException("Could not de-serialize HCatPartition.", exception); } } + + @Override + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public List serializePartitionSpec(HCatPartitionSpec hcatPartitionSpec) throws HCatException { + try { + List stringReps = new ArrayList(); + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + for (PartitionSpec partitionSpec : hcatPartitionSpec.partitionSpecProxy.toPartitionSpec()) { + stringReps.add(serializer.toString(partitionSpec, "UTF-8")); + } + return stringReps; + } + catch (TException serializationException) { + throw new HCatException("Failed to serialize!", serializationException); + } + } + + @Override + public HCatPartitionSpec deserializePartitionSpec(List hcatPartitionSpecStrings) throws HCatException { + try { + List partitionSpecList = new ArrayList(); + TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory()); + for (String stringRep : hcatPartitionSpecStrings) { + PartitionSpec partSpec = new PartitionSpec(); + deserializer.deserialize(partSpec, stringRep, "UTF-8"); + partitionSpecList.add(partSpec); + } + return new HCatPartitionSpec(null, PartitionSpecProxy.Factory.get(partitionSpecList)); + } + catch (TException deserializationException) { + throw new HCatException("Failed to deserialize!", deserializationException); + } + } } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java index dd5da99..2ecf503 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java @@ -1,7 +1,11 @@ package org.apache.hive.hcatalog.api; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hive.hcatalog.common.HCatException; +import java.util.List; + /** * Interface to serialize HCat API elements. */ @@ -51,4 +55,24 @@ public static MetadataSerializer get() throws HCatException { */ public abstract HCatPartition deserializePartition(String hcatPartitionStringRep) throws HCatException; + /** + * Serializer for HCatPartitionSpec. + * @param hcatPartitionSpec HCatPartitionSpec instance to be serialized. + * @return Serialized string-representations. + * @throws HCatException On failure to serialize. + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract List serializePartitionSpec(HCatPartitionSpec hcatPartitionSpec) throws HCatException; + + /** + * Deserializer for HCatPartitionSpec string-representations. + * @param hcatPartitionSpecStrings List of strings to be converted into an HCatPartitionSpec. + * @return Deserialized HCatPartitionSpec instance. + * @throws HCatException On failure to deserialize. (e.g. incompatible serialization format, etc.) + */ + @InterfaceAudience.LimitedPrivate({"Hive"}) + @InterfaceStability.Evolving + public abstract HCatPartitionSpec deserializePartitionSpec(List hcatPartitionSpecStrings) throws HCatException; + } diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index 6f5bff8..321a86d 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -975,7 +975,7 @@ public void testPartitionRegistrationWithCustomSchema() throws Exception { sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_2).build()); // The source table now has 2 partitions, one in TEXTFILE, the other in ORC. - // Test that adding these partitions to the target-table *without* replicating the table-change. + // Test adding these partitions to the target-table *without* replicating the table-change. List sourcePartitions = sourceMetaStore.getPartitions(dbName, tableName); assertEquals("Unexpected number of source partitions.", 2, sourcePartitions.size()); @@ -1007,4 +1007,139 @@ public void testPartitionRegistrationWithCustomSchema() throws Exception { assertTrue("Unexpected exception! " + unexpected.getMessage(), false); } } + + /** + * Test that partition-definitions can be replicated between HCat-instances, + * independently of table-metadata replication, using PartitionSpec interfaces. + * (This is essentially the same test as testPartitionRegistrationWithCustomSchema(), + * transliterated to use the PartitionSpec APIs.) + * 2 identical tables are created on 2 different HCat instances ("source" and "target"). + * On the source instance, + * 1. One partition is added with the old format ("TEXTFILE"). + * 2. The table is updated with an additional column and the data-format changed to ORC. + * 3. Another partition is added with the new format. + * 4. The partitions' metadata is copied to the target HCat instance, without updating the target table definition. + * 5. The partitions' metadata is tested to be an exact replica of that on the source. + * @throws Exception + */ + @Test + public void testPartitionSpecRegistrationWithCustomSchema() throws Exception { + try { + startReplicationTargetMetaStoreIfRequired(); + + HCatClient sourceMetaStore = HCatClient.create(new Configuration(hcatConf)); + final String dbName = "myDb"; + final String tableName = "myTable"; + + sourceMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + + sourceMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build()); + List columnSchema = new ArrayList( + Arrays.asList(new HCatFieldSchema("foo", Type.INT, ""), + new HCatFieldSchema("bar", Type.STRING, ""))); + + List partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""), + new HCatFieldSchema("grid", Type.STRING, "")); + + HCatTable sourceTable = new HCatTable(dbName, tableName).cols(columnSchema) + .partCols(partitionSchema) + .comment("Source table."); + + sourceMetaStore.createTable(HCatCreateTableDesc.create(sourceTable).build()); + + // Verify that the sourceTable was created successfully. + sourceTable = sourceMetaStore.getTable(dbName, tableName); + assertNotNull("Table couldn't be queried for. ", sourceTable); + + // Partitions added now should inherit table-schema, properties, etc. + Map partitionSpec_1 = new HashMap(); + partitionSpec_1.put("grid", "AB"); + partitionSpec_1.put("dt", "2011_12_31"); + HCatPartition sourcePartition_1 = new HCatPartition(sourceTable, partitionSpec_1, ""); + + sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_1).build()); + assertEquals("Unexpected number of partitions. ", + sourceMetaStore.getPartitions(dbName, tableName).size(), 1); + // Verify that partition_1 was added correctly, and properties were inherited from the HCatTable. + HCatPartition addedPartition_1 = sourceMetaStore.getPartition(dbName, tableName, partitionSpec_1); + assertEquals("Column schema doesn't match.", addedPartition_1.getColumns(), sourceTable.getCols()); + assertEquals("InputFormat doesn't match.", addedPartition_1.getInputFormat(), sourceTable.getInputFileFormat()); + assertEquals("OutputFormat doesn't match.", addedPartition_1.getOutputFormat(), sourceTable.getOutputFileFormat()); + assertEquals("SerDe doesn't match.", addedPartition_1.getSerDe(), sourceTable.getSerdeLib()); + assertEquals("SerDe params don't match.", addedPartition_1.getSerdeParams(), sourceTable.getSerdeParams()); + + // Replicate table definition. + + HCatClient targetMetaStore = HCatClient.create(new Configuration(replicationTargetHCatConf)); + targetMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + + targetMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build()); + // Make a copy of the source-table, as would be done across class-loaders. + HCatTable targetTable = targetMetaStore.deserializeTable(sourceMetaStore.serializeTable(sourceTable)); + targetMetaStore.createTable(HCatCreateTableDesc.create(targetTable).build()); + targetTable = targetMetaStore.getTable(dbName, tableName); + + assertEquals("Created table doesn't match the source.", + targetTable.diff(sourceTable), HCatTable.NO_DIFF); + + // Modify Table schema at the source. + List newColumnSchema = new ArrayList(columnSchema); + newColumnSchema.add(new HCatFieldSchema("goo_new", Type.DOUBLE, "")); + Map tableParams = new HashMap(1); + tableParams.put("orc.compress", "ZLIB"); + sourceTable.cols(newColumnSchema) // Add a column. + .fileFormat("orcfile") // Change SerDe, File I/O formats. + .tblProps(tableParams) + .serdeParam(serdeConstants.FIELD_DELIM, Character.toString('\001')); + sourceMetaStore.updateTableSchema(dbName, tableName, sourceTable); + sourceTable = sourceMetaStore.getTable(dbName, tableName); + + // Add another partition to the source. + Map partitionSpec_2 = new HashMap(); + partitionSpec_2.put("grid", "AB"); + partitionSpec_2.put("dt", "2012_01_01"); + HCatPartition sourcePartition_2 = new HCatPartition(sourceTable, partitionSpec_2, ""); + sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_2).build()); + + // The source table now has 2 partitions, one in TEXTFILE, the other in ORC. + // Test adding these partitions to the target-table *without* replicating the table-change. + + HCatPartitionSpec sourcePartitionSpec = sourceMetaStore.getPartitionSpecs(dbName, tableName, -1); + assertEquals("Unexpected number of source partitions.", 2, sourcePartitionSpec.size()); + + // Serialize the hcatPartitionSpec. + List partitionSpecString = sourceMetaStore.serializePartitionSpec(sourcePartitionSpec); + + // Deserialize the HCatPartitionSpec using the target HCatClient instance. + HCatPartitionSpec targetPartitionSpec = targetMetaStore.deserializePartitionSpec(partitionSpecString); + assertEquals("Could not add the expected number of partitions.", + sourcePartitionSpec.size(), targetMetaStore.addPartitionSpec(targetPartitionSpec)); + + // Retrieve partitions. + targetPartitionSpec = targetMetaStore.getPartitionSpecs(dbName, tableName, -1); + assertEquals("Could not retrieve the expected number of partitions.", + sourcePartitionSpec.size(), targetPartitionSpec.size()); + + // Assert that the source and target partitions are equivalent. + HCatPartitionSpec.HCatPartitionIterator sourceIterator = sourcePartitionSpec.getPartitionIterator(); + HCatPartitionSpec.HCatPartitionIterator targetIterator = targetPartitionSpec.getPartitionIterator(); + + while (targetIterator.hasNext()) { + assertTrue("Fewer target partitions than source.", sourceIterator.hasNext()); + HCatPartition sourcePartition = sourceIterator.next(); + HCatPartition targetPartition = targetIterator.next(); + assertEquals("Column schema doesn't match.", sourcePartition.getColumns(), targetPartition.getColumns()); + assertEquals("InputFormat doesn't match.", sourcePartition.getInputFormat(), targetPartition.getInputFormat()); + assertEquals("OutputFormat doesn't match.", sourcePartition.getOutputFormat(), targetPartition.getOutputFormat()); + assertEquals("SerDe doesn't match.", sourcePartition.getSerDe(), targetPartition.getSerDe()); + assertEquals("SerDe params don't match.", sourcePartition.getSerdeParams(), targetPartition.getSerdeParams()); + + } + } + catch (Exception unexpected) { + LOG.error( "Unexpected exception! ", unexpected); + assertTrue("Unexpected exception! " + unexpected.getMessage(), false); + } + } + }