diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java new file mode 100644 index 0000000000..e0d178ff0f --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.ArrayList; +import java.util.List; +import javax.jdo.Query; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class to add the batch process for DirectSQL or RawStore queries. + * 1. Provide the implementation of run() to process one batch + * 2. Call Batchable.runBatched() to process the whole dataset + * + * I: input type, R: result type + */ +public abstract class Batchable { + private static final Logger LOG = LoggerFactory.getLogger(Batchable.class); + public static final int NO_BATCHING = -1; + + private List queries = null; + public abstract List run(List input) throws MetaException; + + public void addQueryAfterUse(Query query) { + if (queries == null) { + queries = new ArrayList(1); + } + queries.add(query); + } + protected void addQueryAfterUse(Batchable b) { + if (b.queries == null) return; + if (queries == null) { + queries = new ArrayList(1); + } + queries.addAll(b.queries); + } + public void closeAllQueries() { + for (Query q : queries) { + try { + q.closeAll(); + } catch (Throwable t) { + LOG.error("Failed to close a query", t); + } + } + } + + public static List runBatched( + final int batchSize, + List input, + Batchable runnable) throws MetaException { + if (batchSize == NO_BATCHING || batchSize >= input.size()) { + return runnable.run(input); + } + List result = new ArrayList(input.size()); + for (int fromIndex = 0, toIndex = 0; toIndex < input.size(); fromIndex = toIndex) { + toIndex = Math.min(fromIndex + batchSize, input.size()); + List batchedInput = input.subList(fromIndex, toIndex); + List batchedOutput = runnable.run(batchedInput); + if (batchedOutput != null) { + result.addAll(batchedOutput); + } + } + return result; + } +} \ No newline at end of file diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 6ead20aeaf..da3e68721e 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -445,7 +445,7 @@ public Database getDatabase(String dbName) throws MetaException{ if (partNames.isEmpty()) { return Collections.emptyList(); } - return runBatched(partNames, new Batchable() { + return Batchable.runBatched(batchSize, partNames, new Batchable() { @Override public List run(List input) throws MetaException { String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")"; @@ -579,7 +579,7 @@ private boolean isViewTable(String dbName, String tblName) throws MetaException } // Get full objects. For Oracle/etc. do it in batches. - List result = runBatched(sqlResult, new Batchable() { + List result = Batchable.runBatched(batchSize, sqlResult, new Batchable() { @Override public List run(List input) throws MetaException { return getPartitionsFromPartitionIds(dbNameLcase, tblNameLcase, isView, input); @@ -1346,7 +1346,7 @@ public ColumnStatistics getTableStats(final String dbName, final String tableNam return ensureList(qResult); } }; - List list = runBatched(colNames, b); + List list = Batchable.runBatched(batchSize, colNames, b); if (list.isEmpty()) { return null; } @@ -1430,10 +1430,10 @@ private long partsFoundForPartitions(final String dbName, final String tableName + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " + " and \"COLUMN_NAME\" in (%1$s) and \"PARTITION_NAME\" in (%2$s)" + " group by \"PARTITION_NAME\""; - List allCounts = runBatched(colNames, new Batchable() { + List allCounts = Batchable.runBatched(batchSize, colNames, new Batchable() { @Override public List run(final List inputColName) throws MetaException { - return runBatched(partNames, new Batchable() { + return Batchable.runBatched(batchSize, partNames, new Batchable() { @Override public List run(List inputPartNames) throws MetaException { long partsFound = 0; @@ -1472,10 +1472,10 @@ private long partsFoundForPartitions(final String dbName, final String tableName final String tableName, final List partNames, List colNames, long partsFound, final boolean useDensityFunctionForNDVEstimation, final double ndvTuner, final boolean enableBitVector) throws MetaException { final boolean areAllPartsFound = (partsFound == partNames.size()); - return runBatched(colNames, new Batchable() { + return Batchable.runBatched(batchSize, colNames, new Batchable() { @Override public List run(final List inputColNames) throws MetaException { - return runBatched(partNames, new Batchable() { + return Batchable.runBatched(batchSize, partNames, new Batchable() { @Override public List run(List inputPartNames) throws MetaException { return columnStatisticsObjForPartitionsBatch(dbName, tableName, inputPartNames, @@ -1884,13 +1884,13 @@ private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i, } }; try { - return runBatched(partNames, b2); + return Batchable.runBatched(batchSize, partNames, b2); } finally { addQueryAfterUse(b2); } } }; - List list = runBatched(colNames, b); + List list = Batchable.runBatched(batchSize, colNames, b); List result = new ArrayList( Math.min(list.size(), partNames.size())); @@ -1991,50 +1991,6 @@ public void prepareTxn() throws MetaException { } } - - private static abstract class Batchable { - private List queries = null; - public abstract List run(List input) throws MetaException; - public void addQueryAfterUse(Query query) { - if (queries == null) { - queries = new ArrayList(1); - } - queries.add(query); - } - protected void addQueryAfterUse(Batchable b) { - if (b.queries == null) return; - if (queries == null) { - queries = new ArrayList(1); - } - queries.addAll(b.queries); - } - public void closeAllQueries() { - for (Query q : queries) { - try { - q.closeAll(); - } catch (Throwable t) { - LOG.error("Failed to close a query", t); - } - } - } - } - - private List runBatched(List input, Batchable runnable) throws MetaException { - if (batchSize == NO_BATCHING || batchSize >= input.size()) { - return runnable.run(input); - } - List result = new ArrayList(input.size()); - for (int fromIndex = 0, toIndex = 0; toIndex < input.size(); fromIndex = toIndex) { - toIndex = Math.min(fromIndex + batchSize, input.size()); - List batchedInput = input.subList(fromIndex, toIndex); - List batchedOutput = runnable.run(batchedInput); - if (batchedOutput != null) { - result.addAll(batchedOutput); - } - } - return result; - } - public List getForeignKeys(String parent_db_name, String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) throws MetaException { List ret = new ArrayList(); String queryText = diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 88d88ed4df..c0431824dc 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -234,6 +234,7 @@ private static Properties prop = null; private static PersistenceManagerFactory pmf = null; private static boolean forTwoMetastoreTesting = false; + private int batchSize = Batchable.NO_BATCHING; private static final DateTimeFormatter YMDHMS_FORMAT = DateTimeFormatter.ofPattern( "yyyy_MM_dd_HH_mm_ss"); @@ -375,6 +376,8 @@ public void setConf(Configuration conf) { directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS); } + this.batchSize = MetastoreConf.getIntVar(conf, ConfVars.RAWSTORE_PARTITION_BATCH_SIZE); + if (!isInitialized) { throw new RuntimeException( "Unable to create persistence manager. Check dss.log for details"); @@ -7719,24 +7722,31 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List result = null; validateTableCols(table, colNames); Query query = queryWrapper.query = pm.newQuery(MTableColumnStatistics.class); - String filter = "tableName == t1 && dbName == t2 && ("; - String paramStr = "java.lang.String t1, java.lang.String t2"; - Object[] params = new Object[colNames.size() + 2]; - params[0] = table.getTableName(); - params[1] = table.getDbName(); - for (int i = 0; i < colNames.size(); ++i) { - filter += ((i == 0) ? "" : " || ") + "colName == c" + i; - paramStr += ", java.lang.String c" + i; - params[i + 2] = colNames.get(i); - } - filter += ")"; - query.setFilter(filter); - query.declareParameters(paramStr); - result = (List) query.executeWithArray(params); - pm.retrieveAll(result); + List result = Batchable.runBatched(batchSize, colNames, new Batchable() { + @Override + public List run(List input) + throws MetaException { + String filter = "tableName == t1 && dbName == t2 && ("; + String paramStr = "java.lang.String t1, java.lang.String t2"; + Object[] params = new Object[input.size() + 2]; + params[0] = table.getTableName(); + params[1] = table.getDbName(); + for (int i = 0; i < input.size(); ++i) { + filter += ((i == 0) ? "" : " || ") + "colName == c" + i; + paramStr += ", java.lang.String c" + i; + params[i + 2] = input.get(i); + } + filter += ")"; + query.setFilter(filter); + query.declareParameters(paramStr); + List paritial = (List) query.executeWithArray(params); + pm.retrieveAll(paritial); + return paritial; + } + }); + if (result.size() > colNames.size()) { throw new MetaException("Unexpected " + result.size() + " statistics for " + colNames.size() + " columns"); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 9f822564bd..c4224f60bd 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -836,6 +836,11 @@ public static ConfVars getMetaConf(String name) { "hive.metastore.wm.default.pool.size", 4, "The size of a default pool to create when creating an empty resource plan;\n" + "If not positive, no default pool will be created."), + RAWSTORE_PARTITION_BATCH_SIZE("metastore.rawstore.batch.size", + "metastore.rawstore.batch.size", -1, + "Batch size for partition and other object retrieval from the underlying DB in JDO.\n" + + "The JDO implementation such as DataNucleus may run into issues when the generated queris are\n" + + "too large. Use this parameter to break the query into multiple batches. -1 means no batching."), // Hive values we have copied and use as is // These two are used to indicate that we are running tests