commit 2e3988c5fd5d08d5527e736acd3318049c22cef1 Author: Mithun RK Date: Wed Nov 29 13:29:04 2017 -0800 HIVE-17467: HCatClient APIs for discovering partition key-values diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java index ab563888c9..6d221aab6d 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java @@ -348,6 +348,73 @@ public abstract HCatPartition getPartition(String dbName, String tableName, Map partitionSpec) throws HCatException; /** + * Checks where any partitions exist that satisfy the (possibly partial) partitionSpec. + * @param dbName The database name. + * @param tableName The table name. + * @param partitionSpec The partition specification. (Need not include all partition keys.) + * @return True if at least one partition exists that matches the partitionSpec, else false. + * @throws HCatException + */ + public abstract boolean partitionExists(String dbName, String tableName, Map partitionSpec) + throws HCatException; + + /** + * Arguments-class, to be used for partition key/value queries. + * {@link #getPartitionKeyValues(String, String, PartKeyValueArgs)}. + */ + public static class PartKeyValueArgs { + + List requiredPartKeys = null; + int maxReturnValues = -1; + String filter = null; + boolean ascending = true; + + /** + * List of partition-keys, for which unique values are to be returned. + */ + public PartKeyValueArgs requiredPartKeys(List requiredPartKeys) { + this.requiredPartKeys = requiredPartKeys; + return this; + } + + /** + * The maximum number of return-values. + */ + public PartKeyValueArgs maxReturnValues(int maxReturnValues) { + this.maxReturnValues = maxReturnValues; + return this; + } + + /** + * Filter string, to specify additional constraints on the partition key/values. + * The string follows the format of a partition-key filter. + * (E.g. "((dt >= '20150101') AND (dt <= '20151231'))"). + */ + public PartKeyValueArgs filter(String filter) { + this.filter = filter; + return this; + } + + /** + * Boolean, choice of lexicographical ordering of key-values. + */ + public PartKeyValueArgs ascending(boolean ascending) { + this.ascending = ascending; + return this; + } + } + + /** + * Lists unique sets of values for the specified partition-keys. + * @param args PartKeyValueArgs that identify the db/table/part-keys being queried. + * @return List of Maps, each associated with a unique combination of partition key-values, + * associated with the specified partition-keys. Lexicographically sorted by key, in specified order. + * @throws HCatException On failure. + */ + public abstract List> getPartitionKeyValues(String dbName, String tableName, + PartKeyValueArgs args) throws HCatException; + + /** * Adds the partition. * * @param partInfo An instance of HCatAddPartitionDesc. diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java index 17b9d03a21..9daaef07db 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java @@ -24,6 +24,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -48,6 +50,9 @@ import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.PartitionValuesRequest; +import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse; +import org.apache.hadoop.hive.metastore.api.PartitionValuesRow; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; @@ -470,6 +475,121 @@ public HCatPartition getPartition(String dbName, String tableName, } @Override + public boolean partitionExists(String dbName, String tableName, Map partitionSpec) throws HCatException { + try { + dbName = checkDB(dbName); + Table table = hmsClient.getTable(dbName, tableName); + List tablePartitionKeys = table.getPartitionKeys(); + + if (tablePartitionKeys.size() == 0) { + throw new HCatException("Unpartitioned table: " + dbName + "." + tableName); + } + + PartitionValuesRequest req = new PartitionValuesRequest(dbName, + tableName, + getFieldSchemasForPartKeys( + Lists.newArrayList(partitionSpec.keySet()), + tablePartitionKeys) + ); + req.setFilter(getFilterString(partitionSpec)); + req.setMaxParts(1); + PartitionValuesResponse response = hmsClient.listPartitionValues(req); + + return response.isSetPartitionValues() && response.getPartitionValuesSize() > 0; + } catch (MetaException e) { + throw new HCatException("MetaException while checking for partitions.", + e); + } catch (NoSuchObjectException e) { + throw new ObjectNotFoundException( + "NoSuchObjectException while checking for partitions.", e); + } catch (TException e) { + throw new ConnectionFailureException( + "TException while checking for partitions.", e); + } + } + + private static List getFieldSchemasForPartKeys(List requestedPartKeys, + List tablePartKeyFieldSchemas) + throws HCatException { + + if (requestedPartKeys == null || requestedPartKeys.isEmpty()) { + LOG.warn("Required part-keys are not specified. Assuming all partition columns are required."); + return tablePartKeyFieldSchemas; + } + + Set tablePartKeys = tablePartKeyFieldSchemas.stream().map(FieldSchema::getName).collect(Collectors.toSet()); + Set requestedPartKeys_lowercase = requestedPartKeys.stream().map(String::toLowerCase).collect(Collectors.toSet()); + + if (!tablePartKeys.containsAll(requestedPartKeys_lowercase)) { + throw new HCatException("Invalid partition keys specified in " + requestedPartKeys_lowercase + + ". Allowed part-keys == " + tablePartKeys); + } + + return tablePartKeyFieldSchemas.stream() + .filter(field -> requestedPartKeys_lowercase.contains(field.getName())) + .collect(Collectors.toList()); + + } + + @Override + public List> getPartitionKeyValues(String dbName, String tableName, PartKeyValueArgs args) throws HCatException { + try { + + if (args == null) { + // Assume defaults. + args = new PartKeyValueArgs(); + } + + LOG.debug("Fetching part-key-values for " + dbName + "." + tableName + " for part-keys: " + args.requiredPartKeys + + " with max " + args.maxReturnValues + " values, using filter-string: \"" + args.filter + "\""); + dbName = checkDB(dbName); + + Table table = hmsClient.getTable(dbName, tableName); + final List requiredPartitionColumns = getFieldSchemasForPartKeys(args.requiredPartKeys, + table.getPartitionKeys()); + // Change requiredPartKeys to use the same order as the table does. + args.requiredPartKeys = requiredPartitionColumns.stream().map(FieldSchema::getName).collect(Collectors.toList()); + + if (LOG.isDebugEnabled()) { + LOG.debug("requiredPartKeys (in order of appearance in table definition): " + args.requiredPartKeys); + } + + PartitionValuesRequest req = new PartitionValuesRequest(dbName, tableName, requiredPartitionColumns); + req.setMaxParts(args.maxReturnValues); + req.setAscending(args.ascending); + req.setFilter(args.filter); + + // NOTE: HiveMetaStoreClient.listPartitionValues() doesn't currently order partition keys-values correctly. + // E.g. listPartitionValues for a 'dt' partition key will not return a list sorted by 'dt' values. + // This is because of a bug in ObjectStore::getDistinctValuesForPartitionsNoTxn. + + PartitionValuesResponse response = hmsClient.listPartitionValues(req); + List> ret = Lists.newArrayListWithExpectedSize(response.getPartitionValuesSize()); + for (PartitionValuesRow row : response.getPartitionValues()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing row: " + row.getRow()); + } + Map partKeyVal = Maps.newHashMapWithExpectedSize(row.getRowSize()); + for (int i=0; i allDates, allGrids; + HCatTable table = null; + + static Context createContext( + String dbName, + String tableName, + List allDates, + List allGrids + ) { + Context context = new Context(); + context.dbName = dbName; + context.tableName = tableName; + context.allDates = allDates; + context.allGrids = allGrids; + return context; + } + + } + + private static Context testContext; + + private static void startMetaStoreServer() throws Exception { + + hcatConf = new HiveConf(TestHCatClientGetPartitionValues.class); + String metastoreUri = System.getProperty("test."+HiveConf.ConfVars.METASTOREURIS.varname); + if (metastoreUri != null) { + hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri); + useExternalMS = true; + return; + } + + System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname, + DbNotificationListener.class.getName()); // turn on db notification listener on metastore + msPort = MetaStoreTestUtils.startMetaStore(); + securityManager = System.getSecurityManager(); + System.setSecurityManager(new NoExitSecurityManager()); + hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + + msPort); + hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, + "false"); + System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); + System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + + client = HCatClient.create(new Configuration(hcatConf)); + } + + @BeforeClass + public static void setup() throws Exception { + startMetaStoreServer(); + testContext = Context.createContext(DB_NAME, TABLE_NAME, getDates(1,31), getGrids('A', 'Z')); + createDb(testContext); + testContext.table = createTable(testContext); + createInitialPartitions(testContext); + } + + private static List getDates(int min, int max) { + List allDates = Lists.newArrayListWithCapacity(max-min+1); + for (int dt=min; dt<=max; ++dt) { + allDates.add(String.format("%02d", dt)); + } + return allDates; + } + + private static List getGrids(char min, char max) { + List allGrids = Lists.newArrayListWithCapacity(max-min+1); + for (char grid = min; grid <= max; ++grid) { + allGrids.add(String.valueOf(grid)); + } + return allGrids; + } + + private static void createDb(Context context) throws Exception { + client.dropDatabase(context.dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(DB_NAME).build()); + } + + private static HCatTable createTable(Context context) throws Exception { + return createTable( + client, context.dbName, context.tableName, + new HashMap() {{ put("data_col_1", Type.INT); put("data_col_2", Type.STRING); }}, // Immaterial. + new HashMap() {{ put("grid", Type.STRING); put("dt", Type.STRING); }} + ); + } + + private static HCatTable createTable(HCatClient client, + String dbName, + String tableName, + Map columnSchema, + Map partitionSchema) throws Exception { + + + client.dropTable(dbName, tableName, true); + HCatTable table = new HCatTable(dbName, tableName) + .cols(constructSchema(columnSchema)) + .partCols(constructSchema(partitionSchema)); + + client.createTable(HCatCreateTableDesc.create(table, false).build()); + table = client.getTable(dbName, tableName); + assertNotNull("Table couldn't be queried for. ", table); + return table; + } + + @SuppressWarnings("deprecation") + private static List constructSchema(Map columnSchema) { + return columnSchema.entrySet().stream().map(input -> { + try { + return new HCatFieldSchema(input.getKey(), input.getValue(), ""); + } + catch (Throwable t) { + fail("Unexpected exception!" + t.getMessage()); + return null; + } + }).collect(Collectors.toList()); + } + + private static String makePartLocation(HCatTable table, Map partitionSpec) throws MetaException { + return (new Path(table.getSd().getLocation(), Warehouse.makePartPath(partitionSpec))).toUri().toString(); + } + + private static void createInitialPartitions(Context context) throws Exception { + // Populate partitions. + // [ {1, A}, {1, B}, ... , {31, Z} ] + List> allPartSpecs = Lists.newArrayListWithExpectedSize( + context.allDates.size()*context.allGrids.size()); + for (final String dt : context.allDates) { + for (final String grid : context.allGrids) { + allPartSpecs.add(new HashMap(){{ + put("dt", dt); + put("grid", grid); + }}); + } + } + createInitialPartitions(client, context.table, allPartSpecs); + } + + private static void createInitialPartitions(HCatClient client, HCatTable table, List> partitionSpecs) + throws Exception { + for (Map partitionSpec : partitionSpecs) { + addPartition(client, table, partitionSpec); + } + } + + private static void addPartition(HCatClient client, HCatTable table, Map partitionSpec) throws Exception { + client.addPartition(createAddPartitionDesc(table, partitionSpec)); + } + + private static HCatAddPartitionDesc createAddPartitionDesc(HCatTable table, Map partitionSpec) throws Exception { + return HCatAddPartitionDesc.create( + new HCatPartition(table, + partitionSpec, + makePartLocation(table, partitionSpec))) + .build(); + } + + @Test + public void testAllDatesAscending() + throws Exception { + try { + // Fetching all unique 'dt' values should return all dates in ascending order. + List> parts = + client.getPartitionKeyValues( + DB_NAME, + TABLE_NAME, + new HCatClient.PartKeyValueArgs() + .requiredPartKeys(Collections.singletonList("dt")) + .ascending(true) + ); + assertEquals("Should get " + testContext.allDates.size() + " unique dates.", testContext.allDates.size(), parts.size()); + Iterator> partsIterator = parts.iterator(); + for (String dt : testContext.allDates) { + Map partSpec = partsIterator.next(); + assertEquals("Partition keys not in expected order.", dt, partSpec.get("dt")); + assertFalse("Partition keys should not contain the \"grid\" key.", + partSpec.containsKey("grid")); + } + } + catch (Exception unexpected) { + LOG.error("Unexpected exception!", unexpected); + fail("Unexpected exception! " + unexpected.getMessage()); + } + } + + @Test + public void testAllDatesDescending() throws Exception { + try { + // Fetching all unique 'dt' values should return all dates in descending order. + List> parts = + client.getPartitionKeyValues( + DB_NAME, + TABLE_NAME, + new HCatClient.PartKeyValueArgs() + .requiredPartKeys(Collections.singletonList("dt")) + .ascending(false) + ); + assertEquals("Should get " + testContext.allDates.size() + " unique dates.", testContext.allDates.size(), parts.size()); + + ListIterator dateIter = testContext.allDates.listIterator(testContext.allDates.size()); + for (Map partSpec : parts) { + assertEquals("Partition keys not in expected order.", dateIter.previous(), partSpec.get("dt")); + assertFalse("Partition keys should not contain the \"grid\" key.", + partSpec.containsKey("grid")); + } + } + catch (Exception unexpected) { + LOG.error("Unexpected exception!", unexpected); + fail("Unexpected exception! " + unexpected.getMessage()); + } + } + + @Test + public void testAllDatesAndGridsDefaults() throws Exception { + try { + // If no args are specified, + // 1. All part-key-value combinations must be returned. + // 2. Key values must be sorted in ascending order. + List> parts = client.getPartitionKeyValues( + DB_NAME, + TABLE_NAME, + null + ); + + assertEquals("Should get n(unique_dates) * n(unique_grids).", + testContext.allDates.size() * testContext.allGrids.size(), parts.size()); + + Iterator> partIter = parts.iterator(); + for (String dt : testContext.allDates) { + for (String grid : testContext.allGrids) { + Map partSpec = partIter.next(); + assertEquals("dt partition key not in order.", dt, partSpec.get("dt")); + assertEquals("grid partition key not in order.", grid, partSpec.get("grid")); + } + } + } + catch (Exception unexpected) { + LOG.error("Unexpected exception!", unexpected); + fail("Unexpected exception! " + unexpected.getMessage()); + } + } + + @Test + public void testAllDatesAndGridsWithRequiredColumnsUnspecified() throws Exception { + try { + // If required-columns are not specified, + // 1. All part-key-value combinations must be returned. + // 2. Key values must be sorted in specified order (ascending, if unspecified). + List> parts = client.getPartitionKeyValues( + DB_NAME, + TABLE_NAME, + new HCatClient.PartKeyValueArgs() + ); + + assertEquals("Should get n(unique_dates) * n(unique_grids).", + testContext.allDates.size() * testContext.allGrids.size(), parts.size()); + + Iterator> partIter = parts.iterator(); + for (String dt : testContext.allDates) { + for (String grid : testContext.allGrids) { + Map partSpec = partIter.next(); + assertEquals("dt partition key not in order.", dt, partSpec.get("dt")); + assertEquals("grid partition key not in order.", grid, partSpec.get("grid")); + } + } + } + catch (Exception unexpected) { + LOG.error("Unexpected exception!", unexpected); + fail("Unexpected exception! " + unexpected.getMessage()); + } + } + + @Test + public void testAllDatesAndGridsDescending() throws Exception { + try { + // Fetching all unique 'dt'+'grid' values should return dates in descending order. + List> parts = client.getPartitionKeyValues( + DB_NAME, + TABLE_NAME, + new HCatClient.PartKeyValueArgs() + .requiredPartKeys(Arrays.asList("dt", "grid")) + .ascending(false) + ); + + assertEquals("Should get n(unique_dates) * n(unique_grids).", + testContext.allDates.size() * testContext.allGrids.size(), parts.size()); + + List allDatesReversed = Lists.newArrayList(testContext.allDates); + Collections.reverse(allDatesReversed); + List allGridsReversed = Lists.newArrayList(testContext.allGrids); + Collections.reverse(allGridsReversed); + + Iterator> partIter = parts.iterator(); + for (String dt : allDatesReversed) { + for (String grid : allGridsReversed) { + Map partSpec = partIter.next(); + assertEquals("dt partition key not in order.", dt, partSpec.get("dt")); + assertEquals("grid partition key not in order.", grid, partSpec.get("grid")); + } + } + } + catch (Exception unexpected) { + LOG.error("Unexpected exception!", unexpected); + fail("Unexpected exception! " + unexpected.getMessage()); + } + } + + @Test + public void testFilteredDatesAndGridsDescending() throws Exception { + try { + // Fetching all unique 'dt'+'grid' values should return dates in descending order. + List> parts = client.getPartitionKeyValues( + DB_NAME, + TABLE_NAME, + new HCatClient.PartKeyValueArgs() + .requiredPartKeys(Arrays.asList("dt", "grid")) + .filter("dt >= \"20\" AND dt <= \"29\" AND (grid = \"A\" OR grid = \"Z\")") + .ascending(false) + ); + + List datesReversed = Lists.newArrayList(testContext.allDates); + Collections.reverse(datesReversed); + List dates = datesReversed.stream() + .filter(dt -> Integer.parseInt(dt) >= 20 && Integer.parseInt(dt) <= 29) + .collect(Collectors.toList()); + + List grids = Lists.newArrayListWithExpectedSize(2); + grids.add("Z"); + grids.add("A"); + + assertEquals("Should get 10*2 partition-specs, i.e. [ {29,Z}, {29,A}, {28,Z}... {20,A} ].", + dates.size() * grids.size(), parts.size()); + + Iterator> partIter = parts.iterator(); + for (String dt : dates) { + for (String grid : grids) { + Map partSpec = partIter.next(); + assertEquals("dt partition key not in order.", dt, partSpec.get("dt")); + assertEquals("grid partition key not in order.", grid, partSpec.get("grid")); + } + } + } + catch (Exception unexpected) { + LOG.error("Unexpected exception!", unexpected); + fail("Unexpected exception! " + unexpected.getMessage()); + } + } + + @Test + public void testNonUniformDatesAndGrids() throws Exception { + + Context customContext = Context.createContext( + DB_NAME, + TABLE_NAME + "_non_uniform", + getDates(1,31), + getGrids('A', 'J')); + + customContext.table = createNonUniformTable(customContext); + createNonUniformPartitions(customContext); + + // For grid=B, the lowest 2 values for dt should be 02 and 12. + List> partSpecs = client.getPartitionKeyValues( + customContext.dbName, + customContext.tableName, + new HCatClient.PartKeyValueArgs() + .requiredPartKeys(Lists.newArrayList("dt")) + .maxReturnValues(2) + .filter("grid='B'") + .ascending(true) + ); + assertEquals(2, partSpecs.size()); + assertEquals(1, partSpecs.get(0).size()); + assertEquals("02", partSpecs.get(0).get("dt")); + assertEquals(1, partSpecs.get(1).size()); + assertEquals("12", partSpecs.get(1).get("dt")); + + // For (grid >= D AND grid <= G), the highest 6 dt values should be [27, 26, 25, 24, 17, 16]. + partSpecs = client.getPartitionKeyValues( + customContext.dbName, + customContext.tableName, + new HCatClient.PartKeyValueArgs() + .requiredPartKeys(Lists.newArrayList("dt")) + .maxReturnValues(6) + .filter("grid >= \"D\" AND grid <= \"G\"") + .ascending(false) + ); + assertEquals(6, partSpecs.size()); + Iterator> partsIter = partSpecs.iterator(); + for (String dt : Arrays.asList("27", "26", "25", "24", "17", "16")) { + Map partSpec = partsIter.next(); + assertEquals(dt, partSpec.get("dt")); + assertEquals(1, partSpec.size()); + } + + // Without filters, all dt values [01-31] should be retrieved. + partSpecs = client.getPartitionKeyValues( + customContext.dbName, + customContext.tableName, + new HCatClient.PartKeyValueArgs() + .requiredPartKeys(Lists.newArrayList("dt")) + .ascending(true) + ); + + assertEquals(31, partSpecs.size()); + partsIter = partSpecs.iterator(); + for (int dt=1; dt <= 31; ++dt) { + String dtString = String.format("%02d", dt); + Map partSpec = partsIter.next(); + assertEquals(dtString, partSpec.get("dt")); + assertEquals(1, partSpec.size()); + } + + } + + private static HCatTable createNonUniformTable(Context context) throws Exception { + return createTable( + client, context.dbName, context.tableName, + new HashMap() {{ put("data_col_1", Type.INT); put("data_col_2", Type.STRING); }}, // Immaterial. + new HashMap() {{ put("grid", Type.STRING); put("dt", Type.STRING); }} + ); + } + + private static void createNonUniformPartitions(Context context) throws Exception { + + // Distribute partitions: 10 grids A-J. Dates 1-31 are shuffled between them. + // 1->A, 2->B, 3->C... 10->J, 11->A, 12->B... + + Map partSpec = Maps.newHashMap(); + int gridIndex = 0; + for(String dt : context.allDates) { + partSpec.put("dt", dt); + partSpec.put("grid", context.allGrids.get(gridIndex++%context.allGrids.size())); + addPartition(client, context.table, partSpec); + } + + } + + @AfterClass + public static void tearDown() throws Exception { + cleanupDatabaseQuietly(client, DB_NAME); + if (!useExternalMS) { + LOG.info("Shutting down metastore."); + System.setSecurityManager(securityManager); + } + } + + private static void cleanupDatabaseQuietly(HCatClient client, String dbName) { + try { + if (client != null) { + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + } + } + catch (Exception exception) { + fail("Unexpected exception!" + exception.getMessage()); + } + } + +} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 63cb52e8d3..0e9b2e0a32 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2437,12 +2437,14 @@ public PartitionValuesResponse listPartitionValues(String dbName, String tableNa dbName = dbName.toLowerCase().trim(); tableName = tableName.toLowerCase().trim(); try { - if (filter == null || filter.isEmpty()) { + try { PartitionValuesResponse response = - getDistinctValuesForPartitionsNoTxn(dbName, tableName, cols, applyDistinct, ascending, maxParts); + getDistinctValuesForPartitionKeys(dbName, tableName, cols, applyDistinct, ascending, maxParts, filter); LOG.info("Number of records fetched: {}", response.getPartitionValues().size()); return response; - } else { + } + catch (Exception exception) { + LOG.warn("Error in pushing down filter. Falling back to calculation with partition-names.", exception); PartitionValuesResponse response = extractPartitionNamesByFilter(dbName, tableName, filter, cols, ascending, applyDistinct, maxParts); if (response != null && response.getPartitionValues() != null) { @@ -2453,7 +2455,6 @@ public PartitionValuesResponse listPartitionValues(String dbName, String tableNa } catch (Exception t) { LOG.error("Exception in ORM", t); throw new MetaException("Error retrieving partition values: " + t); - } finally { } } @@ -2469,7 +2470,7 @@ private PartitionValuesResponse extractPartitionNamesByFilter(String dbName, Str // Get partitions by name - ascending or descending partitionNames = getPartitionNamesByFilter(dbName, tableName, filter, ascending, maxParts); } catch (MetaException e) { - LOG.warn("Querying by partition names failed, trying out with partition objects, filter: {}", filter); + LOG.warn("Querying by partition names failed, trying out with partition objects, filter: " + filter, e); } if (partitionNames == null) { @@ -2569,43 +2570,56 @@ private PartitionValuesResponse extractPartitionNamesByFilter(String dbName, Str return partNames; } - private PartitionValuesResponse getDistinctValuesForPartitionsNoTxn(String dbName, String tableName, List cols, - boolean applyDistinct, boolean ascending, long maxParts) - throws MetaException { + private PartitionValuesResponse getDistinctValuesForPartitionKeys( + String dbName, String tableName, List cols, + boolean applyDistinct, boolean ascending, long maxParts, String filter) throws MetaException { try { openTransaction(); + MTable mTable = getMTable(dbName, tableName); + if( mTable == null ) { + throw new MetaException("Could not find table: " + dbName + "." + tableName); + } + Map params = Maps.newHashMap(); + String queryFilterString = makeQueryFilterString(dbName, mTable, filter, params); Query q = pm.newQuery("select partitionName from org.apache.hadoop.hive.metastore.model.MPartition " - + "where table.database.name == t1 && table.tableName == t2 "); - q.declareParameters("java.lang.String t1, java.lang.String t2"); + + "where " + queryFilterString); - // TODO: Ordering seems to affect the distinctness, needs checking, disabling. -/* - if (ascending) { - q.setOrdering("partitionName ascending"); - } else { - q.setOrdering("partitionName descending"); + String parameterDeclaration = makeParameterDeclarationStringObj(params); + if (LOG.isDebugEnabled()) { + LOG.debug("Filter specified is " + filter + "," + " JDOQL filter is " + queryFilterString); + LOG.debug("Params is " + params); + LOG.debug("ParameterDeclaration: " + parameterDeclaration); } -*/ + q.declareParameters(parameterDeclaration); + if (maxParts > 0) { q.setRange(0, maxParts); } StringBuilder partValuesSelect = new StringBuilder(256); + StringBuilder orderByExpression = new StringBuilder(); if (applyDistinct) { partValuesSelect.append("DISTINCT "); } + + String partKeyOrdering = ascending? " ASCENDING" : " DESCENDING"; List partitionKeys = getTable(dbName, tableName).getPartitionKeys(); for (FieldSchema key : cols) { - partValuesSelect.append(extractPartitionKey(key, partitionKeys)).append(", "); + String partitionKeyExtraction = extractPartitionKey(key, partitionKeys); + partValuesSelect.append(partitionKeyExtraction).append(", "); + orderByExpression.append(partitionKeyExtraction).append(partKeyOrdering).append(", "); } partValuesSelect.setLength(partValuesSelect.length() - 2); + orderByExpression.setLength(orderByExpression.length() - 2); LOG.info("Columns to be selected from Partitions: {}", partValuesSelect); + LOG.info("Ordering: " + orderByExpression); q.setResult(partValuesSelect.toString()); + q.setOrdering(orderByExpression.toString()); PartitionValuesResponse response = new PartitionValuesResponse(); response.setPartitionValues(new ArrayList()); if (cols.size() > 1) { - List results = (List) q.execute(dbName, tableName); + List results = (List) q.executeWithMap(params); for (Object[] row : results) { PartitionValuesRow rowResponse = new PartitionValuesRow(); for (Object columnValue : row) { @@ -2614,7 +2628,7 @@ private PartitionValuesResponse getDistinctValuesForPartitionsNoTxn(String dbNam response.addToPartitionValues(rowResponse); } } else { - List results = (List) q.execute(dbName, tableName); + List results = (List) q.executeWithMap(params); for (Object row : results) { PartitionValuesRow rowResponse = new PartitionValuesRow(); rowResponse.addToRow((String) row);