From 05a6d59b737acad8056876b07dd9f9c2bf7ef38e Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Sun, 17 May 2020 09:58:19 -0700 Subject: [PATCH] HIVE-23281 : ObjectStore::convertToStorageDescriptor can be optimised to reduce calls to DB for ACID tables --- .../hive/ql/parse/SemanticAnalyzer.java | 10 ++- .../clientpositive/llap/acid_nullscan.q.out | 10 +-- .../llap/acid_table_directories_test.q.out | 1 - .../hive/metastore/MetaStoreDirectSql.java | 37 ++++++--- .../hadoop/hive/metastore/ObjectStore.java | 83 ++++++++++--------- 5 files changed, 78 insertions(+), 63 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index aa8d84ec9c..389b930bad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -343,7 +343,7 @@ * that describes percentage and number. */ private final Map nameToSplitSample; - private Map> groupOpToInputTables; + private final Map> groupOpToInputTables; protected Map prunedPartitions; protected List resultSchema; protected CreateViewDesc createVwDesc; @@ -7440,12 +7440,14 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) currentTableId = destTableId; destTableId++; - lbCtx = constructListBucketingCtx(destinationPartition.getSkewedColNames(), - destinationPartition.getSkewedColValues(), destinationPartition.getSkewedColValueLocationMaps(), - destinationPartition.isStoredAsSubDirectories()); if (destTableIsTransactional) { acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable); checkAcidConstraints(); + } else { + // Acid tables can't be list bucketed or have skewed cols + lbCtx = constructListBucketingCtx(destinationPartition.getSkewedColNames(), + destinationPartition.getSkewedColValues(), destinationPartition.getSkewedColValueLocationMaps(), + destinationPartition.isStoredAsSubDirectories()); } try { if (ctx.getExplainConfig() != null) { diff --git a/ql/src/test/results/clientpositive/llap/acid_nullscan.q.out b/ql/src/test/results/clientpositive/llap/acid_nullscan.q.out index 85d58dddb4..e97b6ee0a4 100644 --- a/ql/src/test/results/clientpositive/llap/acid_nullscan.q.out +++ b/ql/src/test/results/clientpositive/llap/acid_nullscan.q.out @@ -91,7 +91,6 @@ STAGE PLANS: properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true"}} bucket_count 2 - bucket_field_name a bucketing_version 2 column.name.delimiter , columns a,b @@ -99,13 +98,13 @@ STAGE PLANS: columns.types int:string #### A masked pattern was here #### name default.acid_vectorized_n1 - numFiles 3 + numFiles 2 numRows 11 rawDataSize 0 serialization.ddl struct acid_vectorized_n1 { i32 a, string b} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.NullStructSerDe - totalSize 2583 + totalSize 1679 transactional true transactional_properties default #### A masked pattern was here #### @@ -116,7 +115,6 @@ STAGE PLANS: properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true"}} bucket_count 2 - bucket_field_name a bucketing_version 2 column.name.delimiter , columns a,b @@ -124,13 +122,13 @@ STAGE PLANS: columns.types int:string #### A masked pattern was here #### name default.acid_vectorized_n1 - numFiles 3 + numFiles 2 numRows 11 rawDataSize 0 serialization.ddl struct acid_vectorized_n1 { i32 a, string b} serialization.format 1 serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde - totalSize 2583 + totalSize 1679 transactional true transactional_properties default #### A masked pattern was here #### diff --git a/ql/src/test/results/clientpositive/llap/acid_table_directories_test.q.out b/ql/src/test/results/clientpositive/llap/acid_table_directories_test.q.out index 1fec3617ad..138ad24bd6 100644 --- a/ql/src/test/results/clientpositive/llap/acid_table_directories_test.q.out +++ b/ql/src/test/results/clientpositive/llap/acid_table_directories_test.q.out @@ -160,7 +160,6 @@ POSTHOOK: Input: default@acidparttbl@p=200 ### ACID DELTA DIR ### ### ACID DELTA DIR ### ### ACID DELTA DIR ### -### ACID DELTA DIR ### #### A masked pattern was here #### ### ACID BASE DIR ### ### ACID BASE DIR ### diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index b69277e5a9..8b7e04d1e9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; @@ -531,7 +532,7 @@ public Database getDatabase(String catName, String dbName) throws MetaException{ * @return List of partitions. */ public List getPartitionsViaSqlFilter(String catName, String dbName, String tableName, - SqlFilterForPushdown filter, Integer max) throws MetaException { + SqlFilterForPushdown filter, Integer max, boolean isTxnTable) throws MetaException { List partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tableName, filter.filter, filter.params, filter.joins, max); @@ -542,7 +543,7 @@ public Database getDatabase(String catName, String dbName) throws MetaException{ @Override public List run(List input) throws MetaException { return getPartitionsFromPartitionIds(catName, dbName, - tableName, null, input, Collections.emptyList()); + tableName, null, input, Collections.emptyList(), isTxnTable); } }); } @@ -776,6 +777,13 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw /** Should be called with the list short enough to not trip up Oracle/etc. */ private List getPartitionsFromPartitionIds(String catName, String dbName, String tblName, Boolean isView, List partIdList, List projectionFields) throws MetaException { + return getPartitionsFromPartitionIds(catName, dbName, tblName, isView, partIdList, projectionFields, false); + } + + /** Should be called with the list short enough to not trip up Oracle/etc. */ + private List getPartitionsFromPartitionIds(String catName, String dbName, String tblName, + Boolean isView, List partIdList, List projectionFields, + boolean isTxnTable) throws MetaException { boolean doTrace = LOG.isDebugEnabled(); @@ -914,13 +922,15 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs. MetastoreDirectSqlUtils.setSDParameters(SD_PARAMS, convertMapNullsToEmptyStrings, pm, sds, sdIds); - MetastoreDirectSqlUtils.setSDSortCols(SORT_COLS, pm, sds, sdIds); + boolean hasSkewedColumns = false; + if (!isTxnTable) { + MetastoreDirectSqlUtils.setSDSortCols(SORT_COLS, pm, sds, sdIds); - MetastoreDirectSqlUtils.setSDBucketCols(BUCKETING_COLS, pm, sds, sdIds); + MetastoreDirectSqlUtils.setSDBucketCols(BUCKETING_COLS, pm, sds, sdIds); - // Skewed columns stuff. - boolean hasSkewedColumns = MetastoreDirectSqlUtils - .setSkewedColNames(SKEWED_COL_NAMES, pm, sds, sdIds); + // Skewed columns stuff. + hasSkewedColumns = MetastoreDirectSqlUtils.setSkewedColNames(SKEWED_COL_NAMES, pm, sds, sdIds); + } // Assume we don't need to fetch the rest of the skewed column data if we have no columns. if (hasSkewedColumns) { @@ -940,8 +950,9 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw } // Finally, get all the stuff for serdes - just the params. - MetastoreDirectSqlUtils - .setSerdeParams(SERDE_PARAMS, convertMapNullsToEmptyStrings, pm, serdes, serdeIds); + if (!isTxnTable) { + MetastoreDirectSqlUtils.setSerdeParams(SERDE_PARAMS, convertMapNullsToEmptyStrings, pm, serdes, serdeIds); + } return orderedResult; } @@ -987,10 +998,10 @@ private static String trimCommaList(StringBuilder sb) { } private static class PartitionFilterGenerator extends TreeVisitor { - private String catName; - private String dbName; - private String tableName; - private List partitionKeys; + private final String catName; + private final String dbName; + private final String tableName; + private final List partitionKeys; private final FilterBuilder filterBuffer; private final List params; private final List joins; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 57488415ed..d192972481 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -1939,10 +1939,12 @@ private Table convertToTable(MTable mtbl) throws MetaException { tableType = TableType.MANAGED_TABLE.toString(); } } + Map parameters = convertMap(mtbl.getParameters()); + boolean isTxnTable = TxnUtils.isTransactionalTable(parameters); final Table t = new Table(mtbl.getTableName(), mtbl.getDatabase().getName(), mtbl .getOwner(), mtbl.getCreateTime(), mtbl.getLastAccessTime(), mtbl - .getRetention(), convertToStorageDescriptor(mtbl.getSd()), - convertToFieldSchemas(mtbl.getPartitionKeys()), convertMap(mtbl.getParameters()), + .getRetention(), convertToStorageDescriptor(mtbl.getSd(), false, parameters, isTxnTable), + convertToFieldSchemas(mtbl.getPartitionKeys()), parameters, mtbl.getViewOriginalText(), mtbl.getViewExpandedText(), tableType); if (Strings.isNullOrEmpty(mtbl.getOwnerType())) { @@ -2095,34 +2097,31 @@ private MColumnDescriptor createNewMColumnDescriptor(List cols) { return new MColumnDescriptor(cols); } - // MSD and SD should be same objects. Not sure how to make then same right now - // MSerdeInfo *& SerdeInfo should be same as well private StorageDescriptor convertToStorageDescriptor( - MStorageDescriptor msd, - boolean noFS) throws MetaException { + MStorageDescriptor msd, boolean noFS, + Map parameters, boolean isTxnTable) throws MetaException { if (msd == null) { return null; } List mFieldSchemas = msd.getCD() == null ? null : msd.getCD().getCols(); + List orderList = (isTxnTable) ? Collections.emptyList() : convertToOrders(msd.getSortCols()); + List bucList = convertList(msd.getBucketCols()); + StorageDescriptor sd = new StorageDescriptor(noFS ? null : convertToFieldSchemas(mFieldSchemas), msd.getLocation(), msd.getInputFormat(), msd.getOutputFormat(), msd - .isCompressed(), msd.getNumBuckets(), convertToSerDeInfo(msd - .getSerDeInfo(), true), convertList(msd.getBucketCols()), convertToOrders(msd - .getSortCols()), convertMap(msd.getParameters())); - SkewedInfo skewedInfo = new SkewedInfo(convertList(msd.getSkewedColNames()), - convertToSkewedValues(msd.getSkewedColValues()), - covertToSkewedMap(msd.getSkewedColValueLocationMaps())); - sd.setSkewedInfo(skewedInfo); + .isCompressed(), msd.getNumBuckets(), convertToSerDeInfo(msd.getSerDeInfo(), true), + bucList , orderList, parameters); + if (!isTxnTable) { + SkewedInfo skewedInfo = new SkewedInfo(convertList(msd.getSkewedColNames()), + convertToSkewedValues(msd.getSkewedColValues()), + covertToSkewedMap(msd.getSkewedColValueLocationMaps())); + sd.setSkewedInfo(skewedInfo); + } sd.setStoredAsSubDirectories(msd.isStoredAsSubDirectories()); return sd; } - private StorageDescriptor convertToStorageDescriptor(MStorageDescriptor msd) - throws MetaException { - return convertToStorageDescriptor(msd, false); - } - /** * Convert a list of MStringList to a list of list string */ @@ -2453,7 +2452,7 @@ public Partition getPartition(String catName, String dbName, String tableName, openTransaction(); MTable table = this.getMTable(catName, dbName, tableName); MPartition mpart = getMPartition(catName, dbName, tableName, part_vals); - part = convertToPart(mpart); + part = convertToPart(mpart, false); committed = commitTransaction(); if (part == null) { throw new NoSuchObjectException("partition values=" @@ -2616,7 +2615,7 @@ private MPartition convertToMPart(Partition part, MTable mt, boolean useTableCD) msd, part.getParameters()); } - private Partition convertToPart(MPartition mpart) throws MetaException { + private Partition convertToPart(MPartition mpart, boolean isTxnTable) throws MetaException { if (mpart == null) { return null; } @@ -2627,22 +2626,25 @@ private Partition convertToPart(MPartition mpart) throws MetaException { String tableName = table == null ? null : table.getTableName(); String catName = table == null ? null : table.getDatabase() == null ? null : table.getDatabase().getCatalogName(); + Map params = convertMap(mpart.getParameters()); Partition p = new Partition(convertList(mpart.getValues()), dbName, tableName, mpart.getCreateTime(), - mpart.getLastAccessTime(), convertToStorageDescriptor(mpart.getSd()), - convertMap(mpart.getParameters())); + mpart.getLastAccessTime(), convertToStorageDescriptor(mpart.getSd(), false, params, isTxnTable), + params); p.setCatName(catName); p.setWriteId(mpart.getWriteId()); return p; } - private Partition convertToPart(String catName, String dbName, String tblName, MPartition mpart) + private Partition convertToPart(String catName, String dbName, String tblName, + MPartition mpart, boolean isTxnTable) throws MetaException { if (mpart == null) { return null; } + Map params = convertMap(mpart.getParameters()); Partition p = new Partition(convertList(mpart.getValues()), dbName, tblName, mpart.getCreateTime(), mpart.getLastAccessTime(), - convertToStorageDescriptor(mpart.getSd(), false), convertMap(mpart.getParameters())); + convertToStorageDescriptor(mpart.getSd(), false, params, isTxnTable), params); p.setCatName(catName); p.setWriteId(mpart.getWriteId()); return p; @@ -2873,7 +2875,7 @@ private boolean dropPartitionCommon(MPartition part) throws MetaException, if (CollectionUtils.isNotEmpty(mparts)) { for (MPartition mpart : mparts) { MTable mtbl = mpart.getTable(); - Partition part = convertToPart(mpart); + Partition part = convertToPart(mpart, false); parts.add(part); if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) { @@ -2907,7 +2909,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN } Partition part = null; MTable mtbl = mpart.getTable(); - part = convertToPart(mpart); + part = convertToPart(mpart, false); if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) { String partName = Warehouse.makePartName(this.convertToFieldSchemas(mtbl .getPartitionKeys()), partVals); @@ -2938,17 +2940,18 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN dest = new ArrayList<>(src.size()); } for (MPartition mp : src) { - dest.add(convertToPart(mp)); + dest.add(convertToPart(mp, false)); Deadline.checkTimeout(); } return dest; } - private List convertToParts(String catName, String dbName, String tblName, List mparts) + private List convertToParts(String catName, String dbName, String tblName, + List mparts, boolean isTxnTable) throws MetaException { List parts = new ArrayList<>(mparts.size()); for (MPartition mp : mparts) { - parts.add(convertToPart(catName, dbName, tblName, mp)); + parts.add(convertToPart(catName, dbName, tblName, mp, isTxnTable)); Deadline.checkTimeout(); } return parts; @@ -3302,7 +3305,7 @@ private Collection getPartitionPsQueryResults(String catName, String dbName, Str part_vals, max_parts, null, queryWrapper); MTable mtbl = getMTable(catName, db_name, tbl_name); for (Object o : parts) { - Partition part = convertToPart((MPartition) o); + Partition part = convertToPart((MPartition) o, false); //set auth privileges if (null != userName && null != groupNames && "TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) { @@ -3443,7 +3446,7 @@ private Collection getPartitionPsQueryResults(String catName, String dbName, Str @Override protected List getJdoResult( GetHelper> ctx) throws MetaException, NoSuchObjectException { - return getPartitionsViaOrmFilter(catName, dbName, tblName, partNames); + return getPartitionsViaOrmFilter(catName, dbName, tblName, partNames, false); } }.run(false); } @@ -3470,6 +3473,7 @@ protected boolean getPartitionsByExprInternal(String catName, String dbName, Str MTable mTable = ensureGetMTable(catName, dbName, tblName); List partitionKeys = convertToFieldSchemas(mTable.getPartitionKeys()); + boolean isTxnTbl = TxnUtils.isTransactionalTable(convertMap(mTable.getParameters())); result.addAll(new GetListHelper(catName, dbName, tblName, allowSql, allowJdo) { @Override protected List getSqlResult(GetHelper> ctx) throws MetaException { @@ -3479,7 +3483,7 @@ protected boolean getPartitionsByExprInternal(String catName, String dbName, Str if (directSql.generateSqlFilterForPushdown(catName, dbName, tblName, partitionKeys, exprTree, defaultPartitionName, filter)) { String catalogName = (catName != null) ? catName : DEFAULT_CATALOG_NAME; - return directSql.getPartitionsViaSqlFilter(catalogName, dbName, tblName, filter, null); + return directSql.getPartitionsViaSqlFilter(catalogName, dbName, tblName, filter, null, isTxnTbl); } } // We couldn't do SQL filter pushdown. Get names via normal means. @@ -3502,7 +3506,7 @@ protected boolean getPartitionsByExprInternal(String catName, String dbName, Str List partNames = new ArrayList<>(); hasUnknownPartitions.set(getPartitionNamesPrunedByExprNoTxn( catName, dbName, tblName, partitionKeys, expr, defaultPartitionName, maxParts, partNames)); - result = getPartitionsViaOrmFilter(catName, dbName, tblName, partNames); + result = getPartitionsViaOrmFilter(catName, dbName, tblName, partNames, isTxnTbl); } return result; } @@ -3611,7 +3615,7 @@ private Integer getNumPartitionsViaOrmFilter(String catName, String dbName, Stri * @return Resulting partitions. */ private List getPartitionsViaOrmFilter(String catName, String dbName, String tblName, - List partNames) throws MetaException { + List partNames, boolean isTxnTable) throws MetaException { if (partNames.isEmpty()) { return Collections.emptyList(); @@ -3628,7 +3632,7 @@ private Integer getNumPartitionsViaOrmFilter(String catName, String dbName, Stri query.setOrdering("partitionName ascending"); List mparts = (List) query.executeWithMap(queryWithParams.getRight()); - List partitions = convertToParts(catName, dbName, tblName, mparts); + List partitions = convertToParts(catName, dbName, tblName, mparts, isTxnTable); query.closeAll(); return partitions; @@ -3979,7 +3983,7 @@ public int getNumPartitionsByFilter(String catName, String dbName, String tblNam tblName = normalizeIdentifier(tblName); MTable mTable = ensureGetMTable(catName, dbName, tblName); List partitionKeys = convertToFieldSchemas(mTable.getPartitionKeys()); - + Map parameters = mTable.getParameters(); return new GetHelper(catName, dbName, tblName, true, true) { private final SqlFilterForPushdown filter = new SqlFilterForPushdown(); @@ -4070,6 +4074,7 @@ protected Integer getJdoResult( MTable mTable = ensureGetMTable(catName, dbName, tblName); List partitionKeys = convertToFieldSchemas(mTable.getPartitionKeys()); + boolean isTxnTable = TxnUtils.isTransactionalTable(convertMap(mTable.getParameters())); final ExpressionTree tree = (filter != null && !filter.isEmpty()) ? PartFilterExprUtil.getFilterParser(filter).tree : ExpressionTree.EMPTY_TREE; return new GetListHelper(catName, dbName, tblName, allowSql, allowJdo) { @@ -4082,7 +4087,7 @@ protected boolean canUseDirectSql(GetHelper> ctx) throws MetaExc @Override protected List getSqlResult(GetHelper> ctx) throws MetaException { - return directSql.getPartitionsViaSqlFilter(catName, dbName, tblName, filter, (maxParts < 0) ? null : (int)maxParts); + return directSql.getPartitionsViaSqlFilter(catName, dbName, tblName, filter, (maxParts < 0) ? null : (int)maxParts, isTxnTable); } @Override @@ -4584,7 +4589,7 @@ private Partition alterPartitionNoTxn(String catName, String dbname, } oldCd.t = oldCD; - return convertToPart(oldp); + return convertToPart(oldp, false); } @Override @@ -8819,7 +8824,7 @@ private void writeMPartitionColumnStatistics(Table table, Partition partition, String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf); Table table = ensureGetTable(catName, statsDesc.getDbName(), statsDesc.getTableName()); Partition partition = convertToPart(getMPartition( - catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals)); + catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals), false); List colNames = new ArrayList<>(); for(ColumnStatisticsObj statsObj : statsObjs) { -- 2.17.2 (Apple Git-113)