diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 997f5fd..4780f76 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -141,7 +141,7 @@ private String DBS, TBLS, PARTITIONS, DATABASE_PARAMS, PARTITION_PARAMS, SORT_COLS, SD_PARAMS, SDS, SERDES, SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, BUCKETING_COLS, SKEWED_COL_NAMES, SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, PARTITION_KEYS, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS, - TAB_COL_STATS, PARTITION_KEY_VALS; + TAB_COL_STATS, PARTITION_KEY_VALS, PART_PRIVS, PART_COL_PRIVS, SKEWED_STRING_LIST, CDS; public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) { this.pm = pm; @@ -459,8 +459,12 @@ public Database getDatabase(String catName, String dbName) throws MetaException{ @Override public List run(List input) throws MetaException { String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")"; - return getPartitionsViaSqlFilterInternal(catName, dbName, tblName, null, filter, input, - Collections.emptyList(), null); + List partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName, + filter, input, Collections.emptyList(), null); + if (partitionIds.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds); } }); } @@ -476,8 +480,19 @@ public Database getDatabase(String catName, String dbName) throws MetaException{ Boolean isViewTable = isViewTable(filter.table); String catName = filter.table.isSetCatName() ? filter.table.getCatName() : DEFAULT_CATALOG_NAME; - return getPartitionsViaSqlFilterInternal(catName, filter.table.getDbName(), - filter.table.getTableName(), isViewTable, filter.filter, filter.params, filter.joins, max); + List partitionIds = getPartitionIdsViaSqlFilter(catName, + filter.table.getDbName(), filter.table.getTableName(), filter.filter, filter.params, + filter.joins, max); + if (partitionIds.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + return runBatched(partitionIds, new Batchable() { + @Override + public List run(List input) throws MetaException { + return getPartitionsFromPartitionIds(catName, filter.table.getDbName(), + filter.table.getTableName(), isViewTable, input); + } + }); } public static class SqlFilterForPushdown { @@ -507,8 +522,20 @@ public boolean generateSqlFilterForPushdown( */ public List getPartitions(String catName, String dbName, String tblName, Integer max) throws MetaException { - return getPartitionsViaSqlFilterInternal(catName, dbName, tblName, null, - null, Collections.emptyList(), Collections.emptyList(), max); + List partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, + tblName, null, Collections.emptyList(), Collections.emptyList(), max); + if (partitionIds.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + + // Get full objects. For Oracle/etc. do it in batches. + List result = runBatched(partitionIds, new Batchable() { + @Override + public List run(List input) throws MetaException { + return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input); + } + }); + return result; } private static Boolean isViewTable(Table t) { @@ -535,12 +562,11 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw } /** - * Get partition objects for the query using direct SQL queries, to avoid bazillion + * Get partition ids for the query using direct SQL queries, to avoid bazillion * queries created by DN retrieving stuff for each object individually. - * @param dbName Metastore db name. - * @param tblName Metastore table name. - * @param isView Whether table is a view. Can be passed as null if not immediately - * known, then this method will get it only if necessary. + * @param catName MetaStore catalog name + * @param dbName MetaStore db name + * @param tblName MetaStore table name * @param sqlFilter SQL filter to use. Better be SQL92-compliant. * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order. * @param joinsForFilter if the filter needs additional join statement, they must be in @@ -548,24 +574,18 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw * @param max The maximum number of partitions to return. * @return List of partition objects. */ - private List getPartitionsViaSqlFilterInternal( - String catName, String dbName, String tblName, final Boolean isView, String sqlFilter, - List paramsForFilter, List joinsForFilter,Integer max) + private List getPartitionIdsViaSqlFilter( + String catName, String dbName, String tblName, String sqlFilter, + List paramsForFilter, List joinsForFilter, Integer max) throws MetaException { boolean doTrace = LOG.isDebugEnabled(); - final String dbNameLcase = dbName.toLowerCase(), tblNameLcase = tblName.toLowerCase(); - final String catNameLcase = normalizeSpace(catName); + final String dbNameLcase = dbName.toLowerCase(); + final String tblNameLcase = tblName.toLowerCase(); + final String catNameLcase = normalizeSpace(catName).toLowerCase(); + // We have to be mindful of order during filtering if we are not returning all partitions. String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : ""; - // Get all simple fields for partitions and related objects, which we can map one-on-one. - // We will do this in 2 queries to use different existing indices for each one. - // We do not get table and DB name, assuming they are the same as we are using to filter. - // TODO: We might want to tune the indexes instead. With current ones MySQL performs - // poorly, esp. with 'order by' w/o index on large tables, even if the number of actual - // results is small (query that returns 8 out of 32k partitions can go 4sec. to 0sec. by - // just adding a \"PART_ID\" IN (...) filter that doesn't alter the results to it, probably - // causing it to not sort the entire table due to not knowing how selective the filter is. String queryText = "select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + "" + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" " @@ -594,16 +614,10 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw if (sqlResult.isEmpty()) { return Collections.emptyList(); // no partitions, bail early. } - - // Get full objects. For Oracle/etc. do it in batches. - List result = runBatched(sqlResult, new Batchable() { - @Override - public List run(List input) throws MetaException { - return getPartitionsFromPartitionIds(catNameLcase, dbNameLcase, tblNameLcase, isView, - input); - } - }); - + List result = new ArrayList(sqlResult.size()); + for (Object fields : sqlResult) { + result.add(extractSqlLong(fields)); + } query.closeAll(); return result; } @@ -612,14 +626,11 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw private List getPartitionsFromPartitionIds(String catName, String dbName, String tblName, Boolean isView, List partIdList) throws MetaException { boolean doTrace = LOG.isDebugEnabled(); + int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma int sbCapacity = partIdList.size() * idStringWidth; - // Prepare StringBuilder for "PART_ID in (...)" to use in future queries. - StringBuilder partSb = new StringBuilder(sbCapacity); - for (Object partitionId : partIdList) { - partSb.append(extractSqlLong(partitionId)).append(","); - } - String partIds = trimCommaList(partSb); + + String partIds = getIdListForIn(partIdList); // Get most of the fields for the IDs provided. // Assume db and table names are the same for all partition, as provided in arguments. @@ -652,7 +663,7 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema. tblName = tblName.toLowerCase(); dbName = dbName.toLowerCase(); - catName = catName.toLowerCase(); + catName = normalizeSpace(catName).toLowerCase(); for (Object[] fields : sqlResult) { // Here comes the ugly part... long partitionId = extractSqlLong(fields[0]); @@ -1060,6 +1071,23 @@ else if (value instanceof byte[]) { } } + /** + * Helper method for preparing for "SOMETHING_ID in (...)" to use in future queries. + * @param objectIds the objectId collection + * @return The concatenated list + * @throws MetaException If the list contains wrong data + */ + private static String getIdListForIn(List objectIds) throws MetaException { + int idStringWidth = (int)Math.ceil(Math.log10(objectIds.size())) + 1; // 1 for comma + int sbCapacity = objectIds.size() * idStringWidth; + // Prepare StringBuilder for "SOMETHING_ID in (...)" to use in future queries. + StringBuilder partSb = new StringBuilder(sbCapacity); + for (Object partitionId : objectIds) { + partSb.append(extractSqlLong(partitionId)).append(","); + } + return trimCommaList(partSb); + } + private static String trimCommaList(StringBuilder sb) { if (sb.length() > 0) { sb.setLength(sb.length() - 1); @@ -2458,4 +2486,343 @@ public void closeAllQueries() { return ret; } + /** + * Drop partitions by using direct SQL queries. + * @param catName Metastore catalog name. + * @param dbName Metastore db name. + * @param tblName Metastore table name. + * @param partNames Partition names to get. + * @return List of partitions. + */ + public void dropPartitionsViaSqlFilter(final String catName, final String dbName, + final String tblName, List partNames) + throws MetaException { + if (partNames.isEmpty()) { + return; + } + + runBatched(partNames, new Batchable() { + @Override + public List run(List input) throws MetaException { + String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")"; + // Get partition ids + List partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName, + filter, input, Collections.emptyList(), null); + if (partitionIds.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + dropPartitionsByPartitionIds(partitionIds); + return Collections.emptyList(); + } + }); + } + + + /** + * Drops Partition-s. Should be called with the list short enough to not trip up Oracle/etc. + * @param partitionIdList The partition identifiers to drop + * @throws MetaException If there is an SQL exception during the execution it converted to + * MetaException + */ + private void dropPartitionsByPartitionIds(List partitionIdList) throws MetaException { + String queryText; + long start, queryTime; + boolean doTrace = LOG.isDebugEnabled(); + + String partitionIds = getIdListForIn(partitionIdList); + + // Get the corresponding SD_ID-s, CD_ID-s, SERDE_ID-s + queryText = + "SELECT " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\", " + SDS + ".\"SERDE_ID\" " + + "from " + SDS + " " + + "INNER JOIN " + PARTITIONS + " ON " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" " + + "WHERE " + PARTITIONS + ".\"PART_ID\" in (" + partitionIds + ")"; + + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + List sqlResult = ensureList(executeWithArray(query, null, queryText)); + + List sdIdList = new ArrayList<>(partitionIdList.size()); + List columnDescriptorIdList = new ArrayList<>(1); + List serdeIdList = new ArrayList<>(partitionIdList.size()); + + if (!sqlResult.isEmpty()) { + for (Object[] fields : sqlResult) { + sdIdList.add(extractSqlLong(fields[0])); + Long colId = extractSqlLong(fields[1]); + if (!columnDescriptorIdList.contains(colId)) { + columnDescriptorIdList.add(colId); + } + serdeIdList.add(extractSqlLong(fields[2])); + } + } + query.closeAll(); + + try { + // Drop privileges + queryText = "delete from " + PART_PRIVS + " where \"PART_ID\" in (" + partitionIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop column level privileges + queryText = "delete from " + PART_COL_PRIVS + " where \"PART_ID\" in (" + partitionIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop partition statistics + queryText = "delete from " + PART_COL_STATS + " where \"PART_ID\" in (" + partitionIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the partition params + queryText = "delete from " + PARTITION_PARAMS + " where \"PART_ID\" in (" + + partitionIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the partition key vals + queryText = "delete from " + PARTITION_KEY_VALS + " where \"PART_ID\" in (" + + partitionIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the partitions + queryText = "delete from " + PARTITIONS + " where \"PART_ID\" in (" + partitionIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + } catch (SQLException sqlException) { + LOG.warn("SQL error executing query while dropping partition", sqlException); + throw new MetaException("Encountered error while dropping partitions."); + } + dropStorageDescriptors(sdIdList); + dropSerdes(serdeIdList); + dropDanglingColumnDescriptors(columnDescriptorIdList); + } + + /** + * Drops SD-s. Should be called with the list short enough to not trip up Oracle/etc. + * @param storageDescriptorIdList The storage descriptor identifiers to drop + * @throws MetaException If there is an SQL exception during the execution it converted to + * MetaException + */ + private void dropStorageDescriptors(List storageDescriptorIdList) throws MetaException { + String queryText; + long start, queryTime; + boolean doTrace = LOG.isDebugEnabled(); + String sdIds = getIdListForIn(storageDescriptorIdList); + + // Get the corresponding SKEWED_STRING_LIST_ID data + queryText = + "select " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" " + + "from " + SKEWED_VALUES + " " + + "WHERE " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ")"; + + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + List sqlResult = ensureList(executeWithArray(query, null, queryText)); + + List skewedStringListIdList = new ArrayList<>(0); + + if (!sqlResult.isEmpty()) { + for (Object[] fields : sqlResult) { + skewedStringListIdList.add(extractSqlLong(fields[0])); + } + } + query.closeAll(); + + String skewedStringListIds = getIdListForIn(skewedStringListIdList); + + try { + // Drop the SD params + queryText = "delete from " + SD_PARAMS + " where \"SD_ID\" in (" + sdIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the sort cols + queryText = "delete from " + SORT_COLS + " where \"SD_ID\" in (" + sdIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the bucketing cols + queryText = "delete from " + BUCKETING_COLS + " where \"SD_ID\" in (" + sdIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the skewed string lists + if (skewedStringListIdList.size() > 0) { + // Drop the skewed string value loc map + queryText = "delete from " + SKEWED_COL_VALUE_LOC_MAP + " where \"SD_ID\" in (" + + sdIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the skewed values + queryText = "delete from " + SKEWED_VALUES + " where \"SD_ID_OID\" in (" + sdIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the skewed string list values + queryText = "delete from " + SKEWED_STRING_LIST_VALUES + " where \"STRING_LIST_ID\" in (" + + skewedStringListIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the skewed string list + queryText = "delete from " + SKEWED_STRING_LIST + " where \"STRING_LIST_ID\" in (" + + skewedStringListIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + } + + // Drop the skewed cols + queryText = "delete from " + SKEWED_COL_NAMES + " where \"SD_ID\" in (" + sdIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the sds + queryText = "delete from " + SDS + " where \"SD_ID\" in (" + sdIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + } catch (SQLException sqlException) { + LOG.warn("SQL error executing query while dropping storage descriptor.", sqlException); + throw new MetaException("Encountered error while dropping storage descriptor."); + } + } + + /** + * Drops Serde-s. Should be called with the list short enough to not trip up Oracle/etc. + * @param serdeIdList The serde identifiers to drop + * @throws MetaException If there is an SQL exception during the execution it converted to + * MetaException + */ + private void dropSerdes(List serdeIdList) throws MetaException { + String queryText; + long start, queryTime; + boolean doTrace = LOG.isDebugEnabled(); + String serdeIds = getIdListForIn(serdeIdList); + + try { + // Drop the serde params + queryText = "delete from " + SERDE_PARAMS + " where \"SERDE_ID\" in (" + serdeIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the serdes + queryText = "delete from " + SERDES + " where \"SERDE_ID\" in (" + serdeIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + } catch (SQLException sqlException) { + LOG.warn("SQL error executing query while dropping serde.", sqlException); + throw new MetaException("Encountered error while dropping serde."); + } + } + + /** + * Checks if the column descriptors still has references for other SD-s. If not, then removes + * them. Should be called with the list short enough to not trip up Oracle/etc. + * @param columnDescriptorIdList The column identifiers + * @throws MetaException If there is an SQL exception during the execution it converted to + * MetaException + */ + private void dropDanglingColumnDescriptors(List columnDescriptorIdList) + throws MetaException { + String queryText; + long start, queryTime; + boolean doTrace = LOG.isDebugEnabled(); + String colIds = getIdListForIn(columnDescriptorIdList); + + // Drop column descriptor, if no relation left + queryText = + "SELECT " + SDS + ".\"CD_ID\", count(1) " + + "from " + SDS + " " + + "WHERE " + SDS + ".\"CD_ID\" in (" + colIds + ") " + + "GROUP BY " + SDS + ".\"CD_ID\""; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + List sqlResult = ensureList(executeWithArray(query, null, queryText)); + + List danglingColumnDescriptorIdList = new ArrayList<>(columnDescriptorIdList.size()); + if (!sqlResult.isEmpty()) { + for (Object[] fields : sqlResult) { + if (extractSqlInt(fields[1]) == 0) { + danglingColumnDescriptorIdList.add(extractSqlLong(fields[0])); + } + } + } + query.closeAll(); + + if (danglingColumnDescriptorIdList.size() > 0) { + try { + String danglingCDIds = getIdListForIn(danglingColumnDescriptorIdList); + + // Drop the columns_v2 + queryText = "delete from " + COLUMNS_V2 + " where \"CD_ID\" in (" + danglingCDIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + + // Drop the cols + queryText = "delete from " + CDS + " where \"CD_ID\" in (" + danglingCDIds + ")"; + start = doTrace ? System.nanoTime() : 0; + executeNoResult(queryText); + queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + Deadline.checkTimeout(); + } catch (SQLException sqlException) { + LOG.warn("SQL error executing query while dropping dangling col descriptions", sqlException); + throw new MetaException("Encountered error while dropping col descriptions"); + } + } + } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 184ecb6..1c4c451 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2534,6 +2534,22 @@ public void dropPartitions(String catName, String dbName, String tblName, List(catName, dbName, tblName, true, true) { + @Override + protected List getSqlResult(GetHelper> ctx) throws MetaException { + directSql.dropPartitionsViaSqlFilter(catName, dbName, tblName, partNames); + return Collections.emptyList(); + } + @Override + protected List getJdoResult(GetHelper> ctx) throws MetaException { + dropPartitionsViaJdo(catName, dbName, tblName, partNames); + return Collections.emptyList(); + } + }.run(false); + } + + private void dropPartitionsViaJdo(String catName, String dbName, String tblName, + List partNames) throws MetaException { boolean success = false; openTransaction(); try { @@ -2542,7 +2558,7 @@ public void dropPartitions(String catName, String dbName, String tblName, List