Index: metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (revision ) @@ -509,6 +509,23 @@ } /** + * @param location + * @return array of FileStatus objects corresponding to the files + * making up the passed storage description + */ + public FileStatus[] getFileStatusesForLocation(String location) + throws MetaException { + try { + Path path = new Path(location); + FileSystem fileSys = path.getFileSystem(conf); + return HiveStatsUtils.getFileStatusRecurse(path, -1, fileSys); + } catch (IOException ioe) { + MetaStoreUtils.logAndThrowMetaException(ioe); + } + return null; + } + + /** * @param table * @return array of FileStatus objects corresponding to the files making up the passed * unpartitioned table Index: metastore/src/test/org/apache/hadoop/hive/metastore/MockPartitionExpressionForMetastore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/MockPartitionExpressionForMetastore.java (revision ) +++ metastore/src/test/org/apache/hadoop/hive/metastore/MockPartitionExpressionForMetastore.java (revision ) @@ -0,0 +1,20 @@ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.metastore.api.MetaException; + +import java.util.List; + +/** + * Test Mock-out for PartitionExpressionForMetastore. + */ +public class MockPartitionExpressionForMetastore implements PartitionExpressionProxy { + @Override + public String convertExprToFilter(byte[] expr) throws MetaException { + return null; + } + + @Override + public boolean filterPartitionsByExpr(List columnNames, byte[] expr, String defaultPartitionName, List partitionNames) throws MetaException { + return false; + } +} Index: metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java (revision 1616367) +++ metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java (revision ) @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.thrift.TException; /** @@ -672,6 +673,11 @@ public boolean addPartitions(String dbName, String tblName, List parts) throws InvalidObjectException, MetaException { return objectStore.addPartitions(dbName, tblName, parts); + } + + @Override + public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec, boolean ifNotExists) throws InvalidObjectException, MetaException { + return false; } @Override Index: metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java (revision 1616367) +++ metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java (revision ) @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.thrift.TException; /** @@ -693,6 +694,11 @@ @Override public boolean addPartitions(String dbName, String tblName, List parts) throws InvalidObjectException, MetaException { + return false; + } + + @Override + public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec, boolean ifNotExists) throws InvalidObjectException, MetaException { return false; } Index: metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java (revision ) +++ metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java (revision ) @@ -0,0 +1,153 @@ +package org.apache.hadoop.hive.metastore.partition.spec; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * PartitionSpecProxy implementation that composes a List of Partitions. + */ +public class PartitionListComposingSpecProxy extends PartitionSpecProxy { + + private PartitionSpec partitionSpec; + + protected PartitionListComposingSpecProxy(PartitionSpec partitionSpec) { + assert partitionSpec.isSetPartitionList() + : "Partition-list should have been set."; + this.partitionSpec = partitionSpec; + } + + @Override + public String getDbName() { + return partitionSpec.getDbName(); + } + + @Override + public String getTableName() { + return partitionSpec.getTableName(); + } + + @Override + public PartitionIterator getPartitionIterator() { + return new Iterator(this); + } + + @Override + public List toPartitionSpec() { + return Arrays.asList(partitionSpec); + } + + @Override + public int size() { + return partitionSpec.getPartitionList().getPartitionsSize(); + } + + @Override + public void setDbName(String dbName) { + partitionSpec.setDbName(dbName); + for (Partition partition : partitionSpec.getPartitionList().getPartitions()) { + partition.setDbName(dbName); + } + } + + @Override + public void setTableName(String tableName) { + partitionSpec.setTableName(tableName); + for (Partition partition : partitionSpec.getPartitionList().getPartitions()) { + partition.setTableName(tableName); + } + } + + @Override + public void setRootLocation(String newRootPath) throws MetaException { + + String oldRootPath = partitionSpec.getRootPath(); + + if (oldRootPath == null) { + throw new MetaException("No common root-path. Can't replace root-path!"); + } + + for (Partition partition : partitionSpec.getPartitionList().getPartitions()) { + String location = partition.getSd().getLocation(); + if (location.startsWith(oldRootPath)) { + partition.getSd().setLocation(location.replace(oldRootPath, newRootPath)); + } + else { + throw new MetaException("Common root-path not found. Can't replace root-path!"); + } + } + } + + public static class Iterator implements PartitionIterator { + + PartitionListComposingSpecProxy partitionSpecProxy; + List partitionList; + int index; + + public Iterator(PartitionListComposingSpecProxy partitionSpecProxy) { + this.partitionSpecProxy = partitionSpecProxy; + this.partitionList = partitionSpecProxy.partitionSpec.getPartitionList().getPartitions(); + this.index = 0; + } + + @Override + public Partition getCurrent() { + return partitionList.get(index); + } + + @Override + public String getDbName() { + return partitionSpecProxy.getDbName(); + } + + @Override + public String getTableName() { + return partitionSpecProxy.getTableName(); + } + + @Override + public Map getParameters() { + return partitionList.get(index).getParameters(); + } + + @Override + public void setParameters(Map parameters) { + partitionList.get(index).setParameters(parameters); + } + + @Override + public String getLocation() { + return partitionList.get(index).getSd().getLocation(); + } + + @Override + public void putToParameters(String key, String value) { + partitionList.get(index).putToParameters(key, value); + } + + @Override + public void setCreateTime(long time) { + partitionList.get(index).setCreateTime((int)time); + } + + @Override + public boolean hasNext() { + return index < partitionList.size(); + } + + @Override + public Partition next() { + return partitionList.get(index++); + } + + @Override + public void remove() { + partitionList.remove(index); + } + } // class Iterator; + +} // class PartitionListComposingSpecProxy; Index: metastore/if/hive_metastore.thrift IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/if/hive_metastore.thrift (revision 1616367) +++ metastore/if/hive_metastore.thrift (revision ) @@ -273,6 +273,32 @@ 8: optional PrincipalPrivilegeSet privileges } +struct PartitionWithoutSD { + 1: list values // string value is converted to appropriate partition key type + 2: i32 createTime, + 3: i32 lastAccessTime, + 4: string relativePath, + 5: map parameters, + 6: optional PrincipalPrivilegeSet privileges +} + +struct PartitionSpecWithSharedSD { + 1: list partitions, + 2: StorageDescriptor sd, +} + +struct PartitionListComposingSpec { + 1: list partitions +} + +struct PartitionSpec { + 1: string dbName, + 2: string tableName, + 3: string rootPath, + 4: optional PartitionSpecWithSharedSD sharedSDPartitionSpec, + 5: optional PartitionListComposingSpec partitionList +} + struct Index { 1: string indexName, // unique with in the whole database namespace 2: string indexHandlerClass, // reserved @@ -786,6 +812,8 @@ 3:MetaException o3) i32 add_partitions(1:list new_parts) throws(1:InvalidObjectException o1, 2:AlreadyExistsException o2, 3:MetaException o3) + i32 add_partitions_pspec(1:list new_parts) + throws(1:InvalidObjectException o1, 2:AlreadyExistsException o2, 3:MetaException o3) Partition append_partition(1:string db_name, 2:string tbl_name, 3:list part_vals) throws (1:InvalidObjectException o1, 2:AlreadyExistsException o2, 3:MetaException o3) AddPartitionsResult add_partitions_req(1:AddPartitionsRequest request) @@ -831,6 +859,9 @@ list get_partitions_with_auth(1:string db_name, 2:string tbl_name, 3:i16 max_parts=-1, 4: string user_name, 5: list group_names) throws(1:NoSuchObjectException o1, 2:MetaException o2) + list get_partitions_pspec(1:string db_name, 2:string tbl_name, 3:i32 max_parts=-1) + throws(1:NoSuchObjectException o1, 2:MetaException o2) + list get_partition_names(1:string db_name, 2:string tbl_name, 3:i16 max_parts=-1) throws(1:MetaException o2) @@ -853,6 +884,11 @@ // get the partitions matching the given partition filter list get_partitions_by_filter(1:string db_name 2:string tbl_name 3:string filter, 4:i16 max_parts=-1) + throws(1:MetaException o1, 2:NoSuchObjectException o2) + + // List partitions as PartitionSpec instances. + list get_part_specs_by_filter(1:string db_name 2:string tbl_name + 3:string filter, 4:i32 max_parts=-1) throws(1:MetaException o1, 2:NoSuchObjectException o2) // get the partitions matching the given partition filter Index: metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (revision ) @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; @@ -294,6 +295,40 @@ // populate those statistics that don't require a full scan of the data. LOG.warn("Updating partition stats fast for: " + part.getTableName()); FileStatus[] fileStatus = wh.getFileStatusesForSD(part.getSd()); + populateQuickStats(fileStatus, params); + LOG.warn("Updated size to " + params.get(StatsSetupConst.TOTAL_SIZE)); + if(!params.containsKey(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK)) { + // invalidate stats requiring scan since this is a regular ddl alter case + for (String stat : StatsSetupConst.statsRequireCompute) { + params.put(stat, "-1"); + } + params.put(StatsSetupConst.COLUMN_STATS_ACCURATE, StatsSetupConst.FALSE); + } else { + params.remove(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK); + params.put(StatsSetupConst.COLUMN_STATS_ACCURATE, StatsSetupConst.TRUE); + } + } + part.setParameters(params); + updated = true; + } + return updated; + } + + public static boolean updatePartitionStatsFast(PartitionSpecProxy.PartitionIterator part, Warehouse wh, + boolean madeDir, boolean forceRecompute) throws MetaException { + Map params = part.getParameters(); + boolean updated = false; + if (forceRecompute || + params == null || + !containsAllFastStats(params)) { + if (params == null) { + params = new HashMap(); + } + if (!madeDir) { + // The partitition location already existed and may contain data. Lets try to + // populate those statistics that don't require a full scan of the data. + LOG.warn("Updating partition stats fast for: " + part.getTableName()); + FileStatus[] fileStatus = wh.getFileStatusesForLocation(part.getLocation()); populateQuickStats(fileStatus, params); LOG.warn("Updated size to " + params.get(StatsSetupConst.TOTAL_SIZE)); if(!params.containsKey(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK)) { Index: metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (revision ) @@ -28,10 +28,12 @@ import org.apache.hadoop.hive.metastore.api.NoSuchLockException; import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.thrift.TException; import java.util.List; @@ -360,6 +362,9 @@ public int add_partitions(List partitions) throws InvalidObjectException, AlreadyExistsException, MetaException, TException; + public int add_partitions_pspec(PartitionSpecProxy partitionSpec) + throws InvalidObjectException, AlreadyExistsException, MetaException, TException; + /** * Add partitions to the table. * @@ -440,6 +445,9 @@ public List listPartitions(String db_name, String tbl_name, short max_parts) throws NoSuchObjectException, MetaException, TException; + public PartitionSpecProxy listPartitionSpecs(String dbName, String tableName, int maxParts) + throws TException; + public List listPartitions(String db_name, String tbl_name, List part_vals, short max_parts) throws NoSuchObjectException, MetaException, TException; @@ -468,6 +476,9 @@ String filter, short max_parts) throws MetaException, NoSuchObjectException, TException; + public PartitionSpecProxy listPartitionSpecsByFilter(String db_name, String tbl_name, + String filter, int max_parts) throws MetaException, + NoSuchObjectException, TException; /** * Get list of partitions matching specified serialized expression Index: metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java (revision ) +++ metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java (revision ) @@ -0,0 +1,154 @@ +package org.apache.hadoop.hive.metastore.partition.spec; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD; +import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Subclass of PartitionSpecProxy that pulls out commonality of + * StorageDescriptor properties within a Partition-list into a common + * StorageDescriptor instance. + */ +public class PartitionSpecWithSharedSDProxy extends PartitionSpecProxy { + + private PartitionSpec partitionSpec; + + public PartitionSpecWithSharedSDProxy(PartitionSpec partitionSpec) { + assert partitionSpec.isSetSharedSDPartitionSpec(); + this.partitionSpec = partitionSpec; + } + + @Override + public int size() { + return partitionSpec.getSharedSDPartitionSpec().getPartitionsSize(); + } + + @Override + public void setDbName(String dbName) { + partitionSpec.setDbName(dbName); + } + + @Override + public void setTableName(String tableName) { + partitionSpec.setTableName(tableName); + } + + @Override + public String getDbName() { + return partitionSpec.getDbName(); + } + + @Override + public String getTableName() { + return partitionSpec.getTableName(); + } + + public PartitionIterator getPartitionIterator() { + return new Iterator(this); + } + + @Override + public List toPartitionSpec() { + return Arrays.asList(partitionSpec); + } + + @Override + public void setRootLocation(String rootLocation) throws MetaException { + partitionSpec.setRootPath(rootLocation); + partitionSpec.getSharedSDPartitionSpec().getSd().setLocation(rootLocation); + } + + /** + * Iterator implementation to iterate over all Partitions within the PartitionSpecWithSharedSDProxy. + */ + public static class Iterator implements PartitionIterator { + + private PartitionSpecWithSharedSDProxy partitionSpecWithSharedSDProxy; + private PartitionSpecWithSharedSD pSpec; + private int index; + + Iterator(PartitionSpecWithSharedSDProxy partitionSpecWithSharedSDProxy) { + this.partitionSpecWithSharedSDProxy = partitionSpecWithSharedSDProxy; + this.pSpec = this.partitionSpecWithSharedSDProxy.partitionSpec.getSharedSDPartitionSpec(); + this.index = 0; + } + + @Override + public boolean hasNext() { + return index < pSpec.getPartitions().size(); + } + + @Override + public Partition next() { + Partition partition = getCurrent(); + ++index; + return partition; + } + + @Override + public void remove() { + pSpec.getPartitions().remove(index); + } + + @Override + public Partition getCurrent() { + PartitionWithoutSD partWithoutSD = pSpec.getPartitions().get(index); + StorageDescriptor partSD = new StorageDescriptor(pSpec.getSd()); + partSD.setLocation(partSD.getLocation() + partWithoutSD.getRelativePath()); + + return new Partition( + partWithoutSD.getValues(), + partitionSpecWithSharedSDProxy.partitionSpec.getDbName(), + partitionSpecWithSharedSDProxy.partitionSpec.getTableName(), + partWithoutSD.getCreateTime(), + partWithoutSD.getLastAccessTime(), + partSD, + partWithoutSD.getParameters() + ); + } + + @Override + public String getDbName() { + return partitionSpecWithSharedSDProxy.partitionSpec.getDbName(); + } + + @Override + public String getTableName() { + return partitionSpecWithSharedSDProxy.partitionSpec.getTableName(); + } + + @Override + public Map getParameters() { + return pSpec.getPartitions().get(index).getParameters(); + } + + @Override + public void setParameters(Map parameters) { + pSpec.getPartitions().get(index).setParameters(parameters); + } + + @Override + public String getLocation() { + return pSpec.getSd().getLocation() + pSpec.getPartitions().get(index).getRelativePath(); + } + + @Override + public void putToParameters(String key, String value) { + pSpec.getPartitions().get(index).putToParameters(key, value); + } + + @Override + public void setCreateTime(long time) { + pSpec.getPartitions().get(index).setCreateTime((int)time); + } + + } // static class Iterator; + +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java (revision ) +++ metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java (revision ) @@ -0,0 +1,160 @@ +package org.apache.hadoop.hive.metastore.partition.spec; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; + +import java.util.List; +import java.util.Map; + +/** + * Polymorphic proxy class, equivalent to org.apache.hadoop.hive.metastore.api.PartitionSpec. + */ +public abstract class PartitionSpecProxy { + + /** + * The number of Partition instances represented by the PartitionSpec. + * @return Number of partitions. + */ + public abstract int size(); + + /** + * Setter for name of the DB. + * @param dbName The name of the DB. + */ + public abstract void setDbName(String dbName); + + /** + * Setter for name of the table. + * @param tableName The name of the table. + */ + public abstract void setTableName(String tableName); + + /** + * Getter for name of the DB. + * @return The name of the DB. + */ + public abstract String getDbName(); + + /** + * Getter for name of the table. + * @return The name of the table. + */ + public abstract String getTableName(); + + /** + * Iterator to the (virtual) sequence of Partitions represented by the PartitionSpec. + * @return A PartitionIterator to the beginning of the Partition sequence. + */ + public abstract PartitionIterator getPartitionIterator(); + + /** + * Conversion to a org.apache.hadoop.hive.metastore.api.PartitionSpec sequence. + * @return A list of org.apache.hadoop.hive.metastore.api.PartitionSpec instances. + */ + public abstract List toPartitionSpec(); + + /** + * Setter for the common root-location for all partitions in the PartitionSet. + * @param rootLocation The new common root-location. + * @throws MetaException + */ + public abstract void setRootLocation(String rootLocation) throws MetaException; + + /** + * Factory to construct PartitionSetProxy instances, from PartitionSets. + */ + public static class Factory { + + /** + * Factory method. Construct PartitionSpecProxy from raw PartitionSpec. + * @param partSpec Raw PartitionSpec from the Thrift API. + * @return PartitionSpecProxy instance. + */ + public static PartitionSpecProxy get(PartitionSpec partSpec) { + + if (partSpec == null) { + return null; + } + else + if (partSpec.isSetPartitionList()) { + return new PartitionListComposingSpecProxy(partSpec); + } + else + if (partSpec.isSetSharedSDPartitionSpec()) { + return new PartitionSpecWithSharedSDProxy(partSpec); + } + + assert false : "Unsupported type of PartitionSpec!"; + return null; + } + + /** + * Factory method to construct CompositePartitionSpecProxy. + * @param partitionSpecs List of raw PartitionSpecs. + * @return A CompositePartitionSpecProxy instance. + */ + public static PartitionSpecProxy get(List partitionSpecs) { + return new CompositePartitionSpecProxy(partitionSpecs); + } + + } // class Factory; + + /** + * Iterator to iterate over Partitions corresponding to a PartitionSpec. + */ + public static interface PartitionIterator extends java.util.Iterator { + + /** + * Getter for the Partition "pointed to" by the iterator. + * Like next(), but without advancing the iterator. + * @return The "current" partition object. + */ + public Partition getCurrent(); + + /** + * Getter for the name of the DB. + * @return Name of the DB. + */ + public String getDbName(); + + /** + * Getter for the name of the table. + * @return Name of the table. + */ + public String getTableName(); + + /** + * Getter for the Partition parameters. + * @return Key-value map for Partition-level parameters. + */ + public Map getParameters(); + + /** + * Setter for Partition parameters. + * @param parameters Key-value map fo Partition-level parameters. + */ + public void setParameters(Map parameters); + + /** + * Insert an individual parameter to a Partition's parameter-set. + * @param key + * @param value + */ + public void putToParameters(String key, String value); + + /** + * Getter for Partition-location. + * @return Partition's location. + */ + public String getLocation(); + + /** + * Setter for creation-time of a Partition. + * @param time Timestamp indicating the time of creation of the Partition. + */ + public void setCreateTime(long time); + + } // class PartitionIterator; + +} // class PartitionSpecProxy; Index: metastore/src/java/org/apache/hadoop/hive/metastore/events/PreAddPartitionEvent.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/events/PreAddPartitionEvent.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/events/PreAddPartitionEvent.java (revision ) @@ -21,19 +21,23 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import java.util.Arrays; +import java.util.Iterator; import java.util.List; public class PreAddPartitionEvent extends PreEventContext { private final Table table; private final List partitions; + private PartitionSpecProxy partitionSpecProxy; public PreAddPartitionEvent (Table table, List partitions, HMSHandler handler) { super(PreEventType.ADD_PARTITION, handler); this.table = table; this.partitions = partitions; + this.partitionSpecProxy = null; } public PreAddPartitionEvent(Table table, Partition partition, HMSHandler handler) { @@ -41,6 +45,14 @@ } /** + * Alternative constructor, using + */ + public PreAddPartitionEvent(Table table, PartitionSpecProxy partitionSpecProxy, HMSHandler handler) { + this(table, (List)null, handler); + this.partitionSpecProxy = partitionSpecProxy; + } + + /** * @return the partitions */ public List getPartitions() { @@ -52,5 +64,12 @@ */ public Table getTable() { return table ; + } + + /** + * @return Iterator over partition-list. + */ + public Iterator getPartitionIterator() { + return partitionSpecProxy == null ? null : partitionSpecProxy.getPartitionIterator(); } } Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision ) @@ -47,6 +47,9 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Multimaps; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -111,6 +114,10 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD; +import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD; import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult; import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest; @@ -127,6 +134,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableStatsRequest; import org.apache.hadoop.hive.metastore.api.TableStatsResult; @@ -168,6 +176,7 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; @@ -1804,6 +1813,52 @@ } } + private static class PartValEqWrapperLite { + List values; + String location; + + public PartValEqWrapperLite(Partition partition) { + this.values = partition.isSetValues()? partition.getValues() : null; + this.location = partition.getSd().getLocation(); + } + + @Override + public int hashCode() { + return values == null ? 0 : values.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || !(obj instanceof PartValEqWrapperLite)) { + return false; + } + + List lhsValues = this.values; + List rhsValues = ((PartValEqWrapperLite)obj).values; + + if (lhsValues == null || rhsValues == null) + return lhsValues == rhsValues; + + if (lhsValues.size() != rhsValues.size()) + return false; + + for (int i=0; i add_partitions_core( RawStore ms, String dbName, String tblName, List parts, boolean ifNotExists) throws MetaException, InvalidObjectException, AlreadyExistsException, TException { @@ -1931,6 +1986,86 @@ return ret; } + @Override + public int add_partitions_pspec(final List partSpecs) + throws TException { + + if (partSpecs.isEmpty()) { + return 0; + } + + String dbName = partSpecs.get(0).getDbName(); + String tableName = partSpecs.get(0).getTableName(); + + return add_partitions_pspec_core(getMS(), dbName, tableName, partSpecs, false); + } + + private int add_partitions_pspec_core( + RawStore ms, String dbName, String tblName, List partSpecs, boolean ifNotExists) + throws TException { + logInfo("add_partitions"); + boolean success = false; + // Ensures that the list doesn't have dups, and keeps track of directories we have created. + Map addedPartitions = new HashMap(); + // PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(dbName, tblName, partSpecs); + PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(partSpecs); + PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy.getPartitionIterator(); + Table tbl = null; + try { + ms.openTransaction(); + tbl = ms.getTable(dbName, tblName); + if (tbl == null) { + throw new InvalidObjectException("Unable to add partitions because " + + "database or table " + dbName + "." + tblName + " does not exist"); + } + + firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); + + int nPartitions = 0; + while(partitionIterator.hasNext()) { + + Partition part = partitionIterator.getCurrent(); + + if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { + throw new MetaException("Partition does not belong to target table " + + dbName + "." + tblName + ": " + part); + } + boolean shouldAdd = startAddPartition(ms, part, ifNotExists); + if (!shouldAdd) { + LOG.info("Not adding partition " + part + " as it already exists"); + continue; + } + boolean madeDir = createLocationForAddedPartition(tbl, part); + if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + initializeAddedPartition(tbl, partitionIterator, madeDir); + + ++nPartitions; + partitionIterator.next(); + } + + success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists) + && ms.commitTransaction(); + + return nPartitions; + } finally { + if (!success) { + ms.rollbackTransaction(); + for (Entry e : addedPartitions.entrySet()) { + if (e.getValue()) { + wh.deleteDir(new Path(e.getKey().location), true); + // we just created this directory - it's not a case of pre-creation, so we nuke + } + } + } + fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true); + } + } + private boolean startAddPartition( RawStore ms, Partition part, boolean ifNotExists) throws MetaException, TException { MetaStoreUtils.validatePartitionNameCharacters(part.getValues(), @@ -2022,6 +2157,39 @@ } } + private void initializeAddedPartition( + final Table tbl, final PartitionSpecProxy.PartitionIterator partIterator, boolean madeDir) throws MetaException { + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) && + !MetaStoreUtils.isView(tbl)) { + MetaStoreUtils.updatePartitionStatsFast(partIterator, wh, madeDir, false); + } + + // set create time + long time = System.currentTimeMillis() / 1000; + partIterator.setCreateTime((int) time); + if (partIterator.getParameters() == null || + partIterator.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) { + partIterator.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); + } + + // Inherit table properties into partition properties. + Map tblParams = tbl.getParameters(); + String inheritProps = hiveConf.getVar(ConfVars.METASTORE_PART_INHERIT_TBL_PROPS).trim(); + // Default value is empty string in which case no properties will be inherited. + // * implies all properties needs to be inherited + Set inheritKeys = new HashSet(Arrays.asList(inheritProps.split(","))); + if (inheritKeys.contains("*")) { + inheritKeys = tblParams.keySet(); + } + + for (String key : inheritKeys) { + String paramVal = tblParams.get(key); + if (null != paramVal) { // add the property only if it exists in table properties + partIterator.putToParameters(key, paramVal); + } + } + } + private Partition add_partition_core(final RawStore ms, final Partition part, final EnvironmentContext envContext) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { @@ -2074,6 +2242,20 @@ } } + private void fireMetaStoreAddPartitionEvent(final Table tbl, + final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success) + throws MetaException { + if (tbl != null && partitionSpec != null) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(tbl, partitionSpec, success, this); + addPartitionEvent.setEnvironmentContext(envContext); + + for (MetaStoreEventListener listener : listeners) { + listener.onAddPartition(addPartitionEvent); + } + } + } + @Override public Partition add_partition(final Partition part) throws InvalidObjectException, AlreadyExistsException, MetaException { @@ -2534,6 +2716,155 @@ } @Override + public List get_partitions_pspec(final String db_name, final String tbl_name, final int max_parts) + throws NoSuchObjectException, MetaException { + + startTableFunction("get_partitions_pspec", db_name, tbl_name); + + List partitionSpecs = null; + try { + Table table = get_table(db_name, tbl_name); + List partitions = get_partitions(db_name, tbl_name, (short) max_parts); + + if (is_partition_spec_grouping_enabled(table)) { + partitionSpecs = get_partitionspecs_grouped_by_storage_descriptor(table, partitions); + } + else { + PartitionSpec pSpec = new PartitionSpec(); + pSpec.setPartitionList(new PartitionListComposingSpec(partitions)); + partitionSpecs = Arrays.asList(pSpec); + } + + return partitionSpecs; + } + finally { + endFunction("get_partitions_pspec", partitionSpecs != null && !partitionSpecs.isEmpty(), null, tbl_name); + } + } + + private static class StorageDescriptorKey { + + private StorageDescriptor sd; + + StorageDescriptorKey(StorageDescriptor sd) { this.sd = sd; } + + StorageDescriptor getSd() { + return sd; + } + + private String hashCodeKey() { + return sd.getInputFormat() + "\t" + + sd.getOutputFormat() + "\t" + + sd.getSerdeInfo().getSerializationLib() + "\t" + + sd.getCols(); + } + + @Override + public int hashCode() { + return hashCodeKey().hashCode(); + } + + @Override + public boolean equals(Object rhs) { + if (rhs == this) + return true; + + if (!(rhs instanceof StorageDescriptorKey)) + return false; + + return (hashCodeKey().equals(((StorageDescriptorKey) rhs).hashCodeKey())); + } + } + + private List get_partitionspecs_grouped_by_storage_descriptor(Table table, List partitions) + throws NoSuchObjectException, MetaException { + + assert is_partition_spec_grouping_enabled(table); + + final String tablePath = table.getSd().getLocation(); + + ImmutableListMultimap partitionsWithinTableDirectory + = Multimaps.index(partitions, new com.google.common.base.Function() { + + @Override + public Boolean apply(Partition input) { + return input.getSd().getLocation().startsWith(tablePath); + } + }); + + List partSpecs = new ArrayList(); + + // Classify partitions within the table directory into groups, + // based on shared SD properties. + + Map> sdToPartList + = new HashMap>(); + + if (partitionsWithinTableDirectory.containsKey(true)) { + + ImmutableList partsWithinTableDir = partitionsWithinTableDirectory.get(true); + for (Partition partition : partsWithinTableDir) { + + PartitionWithoutSD partitionWithoutSD + = new PartitionWithoutSD( partition.getValues(), + partition.getCreateTime(), + partition.getLastAccessTime(), + partition.getSd().getLocation().substring(tablePath.length()), partition.getParameters()); + + StorageDescriptorKey sdKey = new StorageDescriptorKey(partition.getSd()); + if (!sdToPartList.containsKey(sdKey)) { + sdToPartList.put(sdKey, new ArrayList()); + } + + sdToPartList.get(sdKey).add(partitionWithoutSD); + + } // for (partitionsWithinTableDirectory); + + for (Map.Entry> entry : sdToPartList.entrySet()) { + partSpecs.add(getSharedSDPartSpec(table, entry.getKey(), entry.getValue())); + } + + } // Done grouping partitions within table-dir. + + // Lump all partitions outside the tablePath into one PartSpec. + if (partitionsWithinTableDirectory.containsKey(false)) { + List partitionsOutsideTableDir = partitionsWithinTableDirectory.get(false); + if (!partitionsOutsideTableDir.isEmpty()) { + PartitionSpec partListSpec = new PartitionSpec(); + partListSpec.setDbName(table.getDbName()); + partListSpec.setTableName(table.getTableName()); + partListSpec.setPartitionList(new PartitionListComposingSpec(partitionsOutsideTableDir)); + partSpecs.add(partListSpec); + } + + } + return partSpecs; + } + + private PartitionSpec getSharedSDPartSpec(Table table, StorageDescriptorKey sdKey, List partitions) { + + StorageDescriptor sd = new StorageDescriptor(sdKey.getSd()); + sd.setLocation(table.getSd().getLocation()); // Use table-dir as root-dir. + PartitionSpecWithSharedSD sharedSDPartSpec = + new PartitionSpecWithSharedSD(partitions, sd); + + PartitionSpec ret = new PartitionSpec(); + ret.setRootPath(sd.getLocation()); + ret.setSharedSDPartitionSpec(sharedSDPartSpec); + ret.setDbName(table.getDbName()); + ret.setTableName(table.getTableName()); + + return ret; + } + + private static boolean is_partition_spec_grouping_enabled(Table table) { + + Map parameters = table.getParameters(); + return parameters.containsKey("hive.hcatalog.partition.spec.grouping.enabled") + && parameters.get("hive.hcatalog.partition.spec.grouping.enabled").equalsIgnoreCase("true"); + } + + @Override public List get_partition_names(final String db_name, final String tbl_name, final short max_parts) throws MetaException { startTableFunction("get_partition_names", db_name, tbl_name); @@ -3736,6 +4067,37 @@ endFunction("get_partitions_by_filter", ret != null, ex, tblName); } return ret; + } + + @Override + public List get_part_specs_by_filter(final String dbName, + final String tblName, final String filter, final int maxParts) + throws MetaException, NoSuchObjectException, TException { + + startTableFunction("get_partitions_by_filter_pspec", dbName, tblName); + + List partitionSpecs = null; + try { + Table table = get_table(dbName, tblName); + List partitions = get_partitions_by_filter(dbName, tblName, filter, (short) maxParts); + + if (is_partition_spec_grouping_enabled(table)) { + partitionSpecs = get_partitionspecs_grouped_by_storage_descriptor(table, partitions); + } + else { + PartitionSpec pSpec = new PartitionSpec(); + pSpec.setPartitionList(new PartitionListComposingSpec(partitions)); + pSpec.setRootPath(table.getSd().getLocation()); + pSpec.setDbName(dbName); + pSpec.setTableName(tblName); + partitionSpecs = Arrays.asList(pSpec); + } + + return partitionSpecs; + } + finally { + endFunction("get_partitions_by_filter_pspec", partitionSpecs != null && !partitionSpecs.isEmpty(), null, tblName); + } } @Override Index: metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java (revision ) +++ metastore/src/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java (revision ) @@ -0,0 +1,210 @@ +package org.apache.hadoop.hive.metastore.partition.spec; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Implementation of PartitionSpecProxy that composes a list of PartitionSpecProxy. + */ +public class CompositePartitionSpecProxy extends PartitionSpecProxy { + + private String dbName; + private String tableName; + private List partitionSpecs; + private List partitionSpecProxies; + private int size = 0; + + protected CompositePartitionSpecProxy(List partitionSpecs) { + this.partitionSpecs = partitionSpecs; + if (partitionSpecs.isEmpty()) { + dbName = null; + tableName = null; + } + else { + dbName = partitionSpecs.get(0).getDbName(); + tableName = partitionSpecs.get(0).getTableName(); + this.partitionSpecProxies = new ArrayList(partitionSpecs.size()); + for (PartitionSpec partitionSpec : partitionSpecs) { + PartitionSpecProxy partitionSpecProxy = Factory.get(partitionSpec); + this.partitionSpecProxies.add(partitionSpecProxy); + size += partitionSpecProxy.size(); + } + } + // Assert class-invariant. + assert isValid() : "Invalid CompositePartitionSpecProxy!"; + } + + protected CompositePartitionSpecProxy(String dbName, String tableName, List partitionSpecs) { + this.dbName = dbName; + this.tableName = tableName; + this.partitionSpecs = partitionSpecs; + this.partitionSpecProxies = new ArrayList(partitionSpecs.size()); + for (PartitionSpec partitionSpec : partitionSpecs) { + this.partitionSpecProxies.add(PartitionSpecProxy.Factory.get(partitionSpec)); + } + // Assert class-invariant. + assert isValid() : "Invalid CompositePartitionSpecProxy!"; + } + + private boolean isValid() { + for (PartitionSpecProxy partitionSpecProxy : partitionSpecProxies) { + if (partitionSpecProxy instanceof CompositePartitionSpecProxy) { + return false; + } + } + + return true; + } + + @Override + public int size() { + return size; + } + + /** + * Iterator to iterate over all Partitions, across all PartitionSpecProxy instances within the Composite. + */ + public static class Iterator implements PartitionIterator { + + private CompositePartitionSpecProxy composite; + private List partitionSpecProxies; + private int index = -1; // Index into partitionSpecs. + private PartitionIterator iterator = null; + + public Iterator(CompositePartitionSpecProxy composite) { + this.composite = composite; + this.partitionSpecProxies = composite.partitionSpecProxies; + + if (this.partitionSpecProxies != null && !this.partitionSpecProxies.isEmpty()) { + this.index = 0; + this.iterator = this.partitionSpecProxies.get(this.index).getPartitionIterator(); + } + } + + @Override + public boolean hasNext() { + + if (iterator == null) { + return false; + } + + if (iterator.hasNext()) { + return true; + } + + while ( ++index < partitionSpecProxies.size() + && !(iterator = partitionSpecProxies.get(index).getPartitionIterator()).hasNext()); + + return index < partitionSpecProxies.size() && iterator.hasNext(); + + } + + @Override + public Partition next() { + + if (iterator.hasNext()) + return iterator.next(); + + while (++index < partitionSpecProxies.size() + && !(iterator = partitionSpecProxies.get(index).getPartitionIterator()).hasNext()); + + return index == partitionSpecProxies.size()? null : iterator.next(); + + } + + @Override + public void remove() { + iterator.remove(); + } + + @Override + public Partition getCurrent() { + return iterator.getCurrent(); + } + + @Override + public String getDbName() { + return composite.dbName; + } + + @Override + public String getTableName() { + return composite.tableName; + } + + @Override + public Map getParameters() { + return iterator.getParameters(); + } + + @Override + public void setParameters(Map parameters) { + iterator.setParameters(parameters); + } + + @Override + public String getLocation() { + return iterator.getLocation(); + } + + @Override + public void putToParameters(String key, String value) { + iterator.putToParameters(key, value); + } + + @Override + public void setCreateTime(long time) { + iterator.setCreateTime(time); + } + } + + @Override + public void setDbName(String dbName) { + this.dbName = dbName; + for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) { + partSpecProxy.setDbName(dbName); + } + } + + @Override + public void setTableName(String tableName) { + this.tableName = tableName; + for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) { + partSpecProxy.setTableName(tableName); + } + } + + @Override + public String getDbName() { + return dbName; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public PartitionIterator getPartitionIterator() { + return new Iterator(this); + } + + @Override + public List toPartitionSpec() { + return partitionSpecs; + } + + @Override + public void setRootLocation(String rootLocation) throws MetaException { + for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) { + partSpecProxy.setRootLocation(rootLocation); + } + } +} Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStorePartitionSpecs.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStorePartitionSpecs.java (revision ) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStorePartitionSpecs.java (revision ) @@ -0,0 +1,399 @@ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.partition.spec.CompositePartitionSpecProxy; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.util.ExitUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.Permission; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Test to check PartitionSpec support in HiveMetaStore. + */ +public class TestHiveMetaStorePartitionSpecs { + + private static final Logger LOG = LoggerFactory.getLogger(TestHiveMetaStorePartitionSpecs.class); + private static final String msPort = "20102"; + private static HiveConf hiveConf; + private static SecurityManager securityManager; + + public static class NoExitSecurityManager extends SecurityManager { + + @Override + public void checkPermission(Permission perm) { + // allow anything. + } + + @Override + public void checkPermission(Permission perm, Object context) { + // allow anything. + } + + @Override + public void checkExit(int status) { + + super.checkExit(status); + throw new ExitUtil.ExitException(status, "System.exit() was called. Raising exception. "); + } + } + + private static class RunMS implements Runnable { + + @Override + public void run() { + try { + HiveMetaStore.main(new String[]{"-v", "-p", msPort, "--hiveconf", + "hive.metastore.expression.proxy=" + MockPartitionExpressionForMetastore.class.getCanonicalName()}); + } catch (Throwable t) { + LOG.error("Exiting. Got exception from metastore: ", t); + } + } + } + + @AfterClass + public static void tearDown() throws Exception { + LOG.info("Shutting down metastore."); + System.setSecurityManager(securityManager); + } + + @BeforeClass + public static void startMetaStoreServer() throws Exception { + + Thread t = new Thread(new RunMS()); + t.start(); + Thread.sleep(5000); + + securityManager = System.getSecurityManager(); + System.setSecurityManager(new NoExitSecurityManager()); + hiveConf = new HiveConf(TestHiveMetaStorePartitionSpecs.class); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + + msPort); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, + "false"); + hiveConf.set(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS.name(), MockPartitionExpressionForMetastore.class.getCanonicalName()); + System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); + System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + } + + private static String dbName = "testpartitionspecs_db"; + private static String tableName = "testpartitionspecs_table"; + private static int nDates = 10; + private static String datePrefix = "2014010"; + + private static void createTable(HiveMetaStoreClient hmsc, boolean enablePartitionGrouping) throws Exception { + + + List columns = new ArrayList(); + columns.add(new FieldSchema("foo", "string", "")); + columns.add(new FieldSchema("bar", "string", "")); + + List partColumns = new ArrayList(); + partColumns.add(new FieldSchema("dt", "string", "")); + partColumns.add(new FieldSchema("blurb", "string", "")); + + SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), new HashMap()); + + StorageDescriptor storageDescriptor + = new StorageDescriptor(columns, null, + "org.apache.hadoop.hive.ql.io.RCFileInputFormat", + "org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + false, 0, serdeInfo, null, null, null); + + Map tableParameters = new HashMap(); + tableParameters.put("hive.hcatalog.partition.spec.grouping.enabled", enablePartitionGrouping? "true":"false"); + Table table = new Table(tableName, dbName, "", 0, 0, 0, storageDescriptor, partColumns, tableParameters, "", "", ""); + + hmsc.createTable(table); + + } + + private static void clearAndRecreateDB(HiveMetaStoreClient hmsc) throws Exception { + hmsc.dropDatabase(dbName, + true, // Delete data. + true, // Ignore unknownDB. + true // Cascade. + ); + + hmsc.createDatabase(new Database(dbName, + "", // Description. + null, // Location. + null // Parameters. + )); + } + + // Get partition-path. For grid='XYZ', place the partition outside the table-path. + private static String getPartitionPath(Table table, List partValues) { + + return partValues.get(1).equalsIgnoreCase("isLocatedOutsideTablePath")? // i.e. Is the partition outside the table-dir? + table.getSd().getLocation().replace(table.getTableName(), "location_outside_" + table.getTableName()) + + "_" + partValues.get(0) + "_" + partValues.get(1) + : null ; // Use defaults... Partitions are put in the table directory. + + } + + private static void populatePartitions(HiveMetaStoreClient hmsc, Table table, List blurbs) throws Exception { + for (int i=0; i< nDates; ++i) { + for (String blurb : blurbs) { + StorageDescriptor sd = new StorageDescriptor(table.getSd()); + // Add partitions located in the table-directory (i.e. default). + List values = Arrays.asList(datePrefix + i, blurb); + sd.setLocation(getPartitionPath(table, values)); + hmsc.add_partition(new Partition(values, dbName, tableName, 0, 0, sd, null)); + } + } + } + + private void testGetPartitionSpecs(boolean enablePartitionGrouping) { + try { + HiveMetaStoreClient hmsc = new HiveMetaStoreClient(hiveConf); + clearAndRecreateDB(hmsc); + createTable(hmsc, enablePartitionGrouping); + Table table = hmsc.getTable(dbName, tableName); + populatePartitions(hmsc, table, Arrays.asList("isLocatedInTablePath", "isLocatedOutsideTablePath")); + + PartitionSpecProxy partitionSpecProxy = hmsc.listPartitionSpecs(dbName, tableName, -1); + Assert.assertEquals( "Unexpected number of partitions.", nDates * 2, partitionSpecProxy.size()); + + Map> locationToDateMap = new HashMap>(); + locationToDateMap.put("isLocatedInTablePath", new ArrayList()); + locationToDateMap.put("isLocatedOutsideTablePath", new ArrayList()); + PartitionSpecProxy.PartitionIterator iterator = partitionSpecProxy.getPartitionIterator(); + + while (iterator.hasNext()) { + Partition partition = iterator.next(); + locationToDateMap.get(partition.getValues().get(1)).add(partition.getValues().get(0)); + } + + List expectedDates = new ArrayList(nDates); + for (int i=0; i fields = table.getSd().getCols(); + fields.add(new FieldSchema("goo", "string", "Entirely new column. Doesn't apply to older partitions.")); + table.getSd().setCols(fields); + hmsc.alter_table(dbName, tableName, table); + // Check that the change stuck. + table = hmsc.getTable(dbName,tableName); + Assert.assertEquals("Unexpected number of table columns.", + 3, table.getSd().getColsSize()); + + // Add partitions with new schema. + // Mark Partitions with new schema with different blurb. + populatePartitions(hmsc, table, Arrays.asList("hasNewColumn")); + + // Retrieve *all* partitions from the table. + PartitionSpecProxy partitionSpecProxy = hmsc.listPartitionSpecs(dbName, tableName, -1); + Assert.assertEquals("Unexpected number of partitions.", nDates * 3, partitionSpecProxy.size()); + + // Confirm grouping. + Assert.assertTrue("Unexpected type of PartitionSpecProxy.", partitionSpecProxy instanceof CompositePartitionSpecProxy); + CompositePartitionSpecProxy compositePartitionSpecProxy = (CompositePartitionSpecProxy)partitionSpecProxy; + List partitionSpecs = compositePartitionSpecProxy.toPartitionSpec(); + Assert.assertTrue("PartitionSpec[0] should have been a SharedSDPartitionSpec.", + partitionSpecs.get(0).isSetSharedSDPartitionSpec()); + Assert.assertEquals("PartitionSpec[0] should use the table-path as the common root location. ", + table.getSd().getLocation(), partitionSpecs.get(0).getRootPath()); + Assert.assertTrue("PartitionSpec[1] should have been a SharedSDPartitionSpec.", + partitionSpecs.get(1).isSetSharedSDPartitionSpec()); + Assert.assertEquals("PartitionSpec[1] should use the table-path as the common root location. ", + table.getSd().getLocation(), partitionSpecs.get(1).getRootPath()); + Assert.assertTrue("PartitionSpec[2] should have been a ListComposingPartitionSpec.", + partitionSpecs.get(2).isSetPartitionList()); + + // Categorize the partitions returned, and confirm that all partitions are accounted for. + PartitionSpecProxy.PartitionIterator iterator = partitionSpecProxy.getPartitionIterator(); + Map> blurbToPartitionList = new HashMap>(3); + while (iterator.hasNext()) { + + Partition partition = iterator.next(); + String blurb = partition.getValues().get(1); + + if (!blurbToPartitionList.containsKey(blurb)) { + blurbToPartitionList.put(blurb, new ArrayList(nDates)); + } + + blurbToPartitionList.get(blurb).add(partition); + + } // + + // All partitions with blurb="isLocatedOutsideTablePath" should have 2 columns, + // and must have locations outside the table directory. + for (Partition partition : blurbToPartitionList.get("isLocatedOutsideTablePath")) { + Assert.assertEquals("Unexpected number of columns.", 2, partition.getSd().getCols().size()); + Assert.assertEquals("Unexpected first column.", "foo", partition.getSd().getCols().get(0).getName()); + Assert.assertEquals("Unexpected second column.", "bar", partition.getSd().getCols().get(1).getName()); + String partitionLocation = partition.getSd().getLocation(); + String tableLocation = table.getSd().getLocation(); + Assert.assertTrue("Unexpected partition location: " + partitionLocation + ". " + + "Partition should have been outside table location: " + tableLocation, + !partitionLocation.startsWith(tableLocation)); + } + + // All partitions with blurb="isLocatedInTablePath" should have 2 columns, + // and must have locations within the table directory. + for (Partition partition : blurbToPartitionList.get("isLocatedInTablePath")) { + Assert.assertEquals("Unexpected number of columns.", 2, partition.getSd().getCols().size()); + Assert.assertEquals("Unexpected first column.", "foo", partition.getSd().getCols().get(0).getName()); + Assert.assertEquals("Unexpected second column.", "bar", partition.getSd().getCols().get(1).getName()); + String partitionLocation = partition.getSd().getLocation(); + String tableLocation = table.getSd().getLocation(); + Assert.assertTrue("Unexpected partition location: " + partitionLocation + ". " + + "Partition should have been within table location: " + tableLocation, + partitionLocation.startsWith(tableLocation)); + } + + // All partitions with blurb="hasNewColumn" were added after the table schema changed, + // and must have 3 columns. Also, the partition locations must lie within the table directory. + for (Partition partition : blurbToPartitionList.get("hasNewColumn")) { + Assert.assertEquals("Unexpected number of columns.", 3, partition.getSd().getCols().size()); + Assert.assertEquals("Unexpected first column.", "foo", partition.getSd().getCols().get(0).getName()); + Assert.assertEquals("Unexpected second column.", "bar", partition.getSd().getCols().get(1).getName()); + Assert.assertEquals("Unexpected third column.", "goo", partition.getSd().getCols().get(2).getName()); + String partitionLocation = partition.getSd().getLocation(); + String tableLocation = table.getSd().getLocation(); + Assert.assertTrue("Unexpected partition location: " + partitionLocation + ". " + + "Partition should have been within table location: " + tableLocation, + partitionLocation.startsWith(tableLocation)); + } + + } + catch (Throwable t) { + LOG.error("Unexpected Exception!", t); + t.printStackTrace(); + Assert.assertTrue("Unexpected Exception!", false); + } + } + +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision ) @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; import javax.jdo.JDODataStoreException; import javax.jdo.JDOHelper; @@ -60,6 +61,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -128,6 +130,7 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator; import org.apache.hadoop.hive.metastore.parser.FilterLexer; import org.apache.hadoop.hive.metastore.parser.FilterParser; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; import org.datanucleus.store.rdbms.exceptions.MissingTableException; @@ -178,6 +181,8 @@ private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE; private final AtomicBoolean isSchemaVerified = new AtomicBoolean(false); + private Pattern partitionValidationPattern; + public ObjectStore() { } @@ -222,6 +227,14 @@ initialize(propsFromConf); + String partitionValidationRegex = + hiveConf.get(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.name()); + if (partitionValidationRegex != null && partitionValidationRegex.equals("")) { + partitionValidationPattern = Pattern.compile(partitionValidationRegex); + } else { + partitionValidationPattern = null; + } + if (!isInitialized) { throw new RuntimeException( "Unable to create persistence manager. Check dss.log for details"); @@ -1265,6 +1278,80 @@ } if (toPersist.size() > 0) { pm.makePersistentAll(toPersist); + } + + success = commitTransaction(); + } finally { + if (!success) { + rollbackTransaction(); + } + } + return success; + } + + private boolean isValidPartition( + Partition part, boolean ifNotExists) throws MetaException { + MetaStoreUtils.validatePartitionNameCharacters(part.getValues(), + partitionValidationPattern); + boolean doesExist = doesPartitionExist( + part.getDbName(), part.getTableName(), part.getValues()); + if (doesExist && !ifNotExists) { + throw new MetaException("Partition already exists: " + part); + } + return !doesExist; + } + + + @Override + public boolean addPartitions(String dbName, String tblName, + PartitionSpecProxy partitionSpec, boolean ifNotExists) + throws InvalidObjectException, MetaException { + boolean success = false; + openTransaction(); + try { + List tabGrants = null; + List tabColumnGrants = null; + MTable table = this.getMTable(dbName, tblName); + if ("TRUE".equalsIgnoreCase(table.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) { + tabGrants = this.listAllTableGrants(dbName, tblName); + tabColumnGrants = this.listTableAllColumnGrants(dbName, tblName); + } + + if (!partitionSpec.getTableName().equals(tblName) || !partitionSpec.getDbName().equals(dbName)) { + throw new MetaException("Partition does not belong to target table " + + dbName + "." + tblName + ": " + partitionSpec); + } + +// List toPersist = new ArrayList(); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + + int now = (int)(System.currentTimeMillis()/1000); + + while (iterator.hasNext()) { + Partition part = iterator.next(); + + if (isValidPartition(part, ifNotExists)) { + MPartition mpart = convertToMPart(part, true); +// toPersist.add(mpart); + pm.makePersistent(mpart); + if (tabGrants != null) { + for (MTablePrivilege tab : tabGrants) { +// toPersist.add(new MPartitionPrivilege(tab.getPrincipalName(), + pm.makePersistent(new MPartitionPrivilege(tab.getPrincipalName(), + tab.getPrincipalType(), mpart, tab.getPrivilege(), now, + tab.getGrantor(), tab.getGrantorType(), tab.getGrantOption())); + } + } + + if (tabColumnGrants != null) { + for (MTableColumnPrivilege col : tabColumnGrants) { +// toPersist.add(new MPartitionColumnPrivilege(col.getPrincipalName(), + pm.makePersistent(new MPartitionColumnPrivilege(col.getPrincipalName(), + col.getPrincipalType(), mpart, col.getColumnName(), col.getPrivilege(), + now, col.getGrantor(), col.getGrantorType(), col.getGrantOption())); + } + } + } } success = commitTransaction(); Index: metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (revision ) @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.thrift.TException; public interface RawStore extends Configurable { @@ -126,6 +127,9 @@ throws InvalidObjectException, MetaException; public abstract boolean addPartitions(String dbName, String tblName, List parts) + throws InvalidObjectException, MetaException; + + public abstract boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec, boolean ifNotExists) throws InvalidObjectException, MetaException; public abstract Partition getPartition(String dbName, String tableName, Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision ) @@ -97,6 +97,7 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult; import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest; @@ -119,6 +120,8 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.partition.spec.CompositePartitionSpecProxy; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -495,6 +498,11 @@ return needResults ? result.getPartitions() : null; } + @Override + public int add_partitions_pspec(PartitionSpecProxy partitionSpec) throws TException { + return client.add_partitions_pspec(partitionSpec.toPartitionSpec()); + } + /** * @param table_name * @param db_name @@ -897,6 +905,11 @@ } @Override + public PartitionSpecProxy listPartitionSpecs(String dbName, String tableName, int maxParts) throws TException { + return PartitionSpecProxy.Factory.get(client.get_partitions_pspec(dbName, tableName, maxParts)); + } + + @Override public List listPartitions(String db_name, String tbl_name, List part_vals, short max_parts) throws NoSuchObjectException, MetaException, TException { @@ -941,6 +954,14 @@ NoSuchObjectException, TException { return deepCopyPartitions( client.get_partitions_by_filter(db_name, tbl_name, filter, max_parts)); + } + + @Override + public PartitionSpecProxy listPartitionSpecsByFilter(String db_name, String tbl_name, + String filter, int max_parts) throws MetaException, + NoSuchObjectException, TException { + return PartitionSpecProxy.Factory.get( + client.get_part_specs_by_filter(db_name, tbl_name, filter, max_parts)); } @Override Index: metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java (revision 1616367) +++ metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java (revision ) @@ -21,19 +21,23 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import java.util.Arrays; +import java.util.Iterator; import java.util.List; public class AddPartitionEvent extends ListenerEvent { private final Table table; private final List partitions; + private PartitionSpecProxy partitionSpecProxy; public AddPartitionEvent(Table table, List partitions, boolean status, HMSHandler handler) { super(status, handler); this.table = table; this.partitions = partitions; + this.partitionSpecProxy = null; } public AddPartitionEvent(Table table, Partition partition, boolean status, HMSHandler handler) { @@ -41,6 +45,16 @@ } /** + * Alternative constructor to use PartitionSpec APIs. + */ + public AddPartitionEvent(Table table, PartitionSpecProxy partitionSpec, boolean status, HMSHandler handler) { + super(status, handler); + this.table = table; + this.partitions = null; + this.partitionSpecProxy = partitionSpec; + } + + /** * @return The table. */ public Table getTable() { @@ -52,6 +66,13 @@ */ public List getPartitions() { return partitions; + } + + /** + * @return Iterator for partitions. + */ + public Iterator getPartitionIterator() { + return partitionSpecProxy == null ? null : partitionSpecProxy.getPartitionIterator(); } }