From 044e7884b012d49b28a3971b686991160e708f53 Mon Sep 17 00:00:00 2001 From: Alexander Kolbasov Date: Fri, 13 Oct 2017 18:33:51 -0700 Subject: [PATCH 1/1] HIVE-17736: ObjectStore transaction handling can be simplified --- .../apache/hadoop/hive/metastore/ObjectStore.java | 987 +++++++++------------ .../hadoop/hive/metastore/tools/HiveMetaTool.java | 21 +- 2 files changed, 431 insertions(+), 577 deletions(-) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index d04a343913..0f7ba28682 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -258,20 +258,126 @@ private Counter directSqlErrors; /** - * A Autocloseable wrapper around Query class to pass the Query object to the caller and let the caller release - * the resources when the QueryWrapper goes out of scope + * AutoCloseable query wrapper which doesn't throw exceptions on close. + * In current JDO implementation Query itself is AutoCloseable, + * but its close() method throws exceptions. To avoid propafating these up the call stack, + * this wrapper provides close() method that doesn't throw exceptions.

+ * + * QueryWrapper simulates regular query operations. New ones may be added as needed. + * The typical access pattern is

+ * + *

+   *   try (QueryWrapper query = new QueryWrapper(pm.newQuery(Foo.class))) {
+   *     return query.execute();
+   *   }
+   * 
+ *

+ * Note that calling explicit close() is never useful because it can be called + * on the query itself instead. */ public static class QueryWrapper implements AutoCloseable { - public Query query; + private final Query query; /** - * Explicitly closes the query object to release the resources + * Initialize from existing query. + * @param query open query. */ + QueryWrapper(Query query) { + this.query = query; + } + + public Query getQuery() { + return query; + } + + // Wrappers around regular Query operations + + long deletePersistentAll(Object... parameters) { + return query.deletePersistentAll(parameters); + } + + void declareParameters(String parameters) { + query.declareParameters(parameters); + } + + void declareImports(String imports) { + query.declareImports(imports); + } + + Object execute() { + return query.execute(); + } + + Object execute(Object p1) { + return query.execute(p1); + } + + Object execute(Object p1, Object p2) { + return query.execute(p1, p2); + } + + Object execute(Object p1, Object p2, Object p3) { + return query.execute(p1, p2, p3); + } + + Object executeWithMap (Map parameters) { + return query.executeWithMap(parameters); + } + + Object executeWithArray (Object... parameters) { + return query.executeWithArray(parameters); + } + + void setFilter(String filter) { + query.setFilter(filter); + } + + void setOrdering(String ordering) { + query.setOrdering(ordering); + } + + void setRange(long fromIncl, long toExcl) { + query.setRange(fromIncl, toExcl); + } + + void setResult(String data) { + query.setResult(data); + } + + void setUnique(boolean unique) { + query.setUnique(unique); + } + + void setResultClass(Class cls) { + query.setResultClass(cls); + } + + void setClass(Class cls) { + query.setClass(cls); + } + @Override public void close() { - if (query != null) { - query.closeAll(); - query = null; + query.closeAll(); + } + } + + private class THandler implements AutoCloseable { + + THandler() { + openTransaction(); + } + + @Override + public void close() { + boolean committed = false; + + try { + committed = commitTransaction(); + } finally { + if (!committed) { + rollbackTransaction(); + } } } } @@ -890,10 +996,9 @@ public boolean alterDatabase(String dbName, Database db) @Override public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException { - boolean success = false; - LOG.info("Dropping database " + dbname + " along with all tables"); + boolean committed = false; + LOG.info("Dropping database {} along with all tables", dbname); dbname = HiveStringUtils.normalizeIdentifier(dbname); - QueryWrapper queryWrapper = new QueryWrapper(); try { openTransaction(); @@ -901,17 +1006,19 @@ public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaExc MDatabase db = getMDatabase(dbname); pm.retrieve(db); if (db != null) { - List dbGrants = this.listDatabaseGrants(dbname, queryWrapper); - if (dbGrants != null && dbGrants.size() > 0) { + List dbGrants = this.listDatabaseGrants(dbname); + if (dbGrants != null && !dbGrants.isEmpty()) { pm.deletePersistentAll(dbGrants); } pm.deletePersistent(db); } - success = commitTransaction(); + committed = commitTransaction(); } finally { - rollbackAndCleanup(success, queryWrapper); + if (!committed) { + rollbackTransaction(); + } } - return success; + return committed; } @Override @@ -2215,12 +2322,7 @@ private boolean dropPartitionCommon(MPartition part) throws NoSuchObjectExceptio @Override protected List getJdoResult( GetHelper> ctx) throws MetaException { - QueryWrapper queryWrapper = new QueryWrapper(); - try { - return convertToParts(listMPartitions(dbName, tblName, maxParts, queryWrapper)); - } finally { - queryWrapper.close(); - } + return convertToParts(listMPartitions(dbName, tblName, maxParts)); } }.run(false); } @@ -2229,32 +2331,31 @@ private boolean dropPartitionCommon(MPartition part) throws NoSuchObjectExceptio public List getPartitionsWithAuth(String dbName, String tblName, short max, String userName, List groupNames) throws MetaException, InvalidObjectException { - boolean success = false; - QueryWrapper queryWrapper = new QueryWrapper(); + boolean committed = false; try { openTransaction(); - List mparts = listMPartitions(dbName, tblName, max, queryWrapper); - List parts = new ArrayList(mparts.size()); - if (mparts != null && mparts.size()>0) { - for (MPartition mpart : mparts) { - MTable mtbl = mpart.getTable(); - Partition part = convertToPart(mpart); - parts.add(part); - - if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) { - String partName = Warehouse.makePartName(this.convertToFieldSchemas(mtbl - .getPartitionKeys()), part.getValues()); - PrincipalPrivilegeSet partAuth = this.getPartitionPrivilegeSet(dbName, - tblName, partName, userName, groupNames); - part.setPrivileges(partAuth); - } + List mparts = listMPartitions(dbName, tblName, max); + List parts = new ArrayList<>(mparts.size()); + for (MPartition mpart : mparts) { + MTable mtbl = mpart.getTable(); + Partition part = convertToPart(mpart); + parts.add(part); + + if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) { + String partName = Warehouse.makePartName(this.convertToFieldSchemas(mtbl + .getPartitionKeys()), part.getValues()); + PrincipalPrivilegeSet partAuth = this.getPartitionPrivilegeSet(dbName, + tblName, partName, userName, groupNames); + part.setPrivileges(partAuth); } } - success = commitTransaction(); + committed = commitTransaction(); return parts; } finally { - rollbackAndCleanup(success, queryWrapper); + if (!committed) { + rollbackTransaction(); + } } } @@ -2623,7 +2724,8 @@ private PartitionValuesResponse getDistinctValuesForPartitionsNoTxn(String dbNam * has types of String, and if resultsCol is null, the types are MPartition. */ private Collection getPartitionPsQueryResults(String dbName, String tableName, - List part_vals, short max_parts, String resultsCol, QueryWrapper queryWrapper) + List part_vals, short max_parts, + String resultsCol) throws MetaException, NoSuchObjectException { dbName = HiveStringUtils.normalizeIdentifier(dbName); tableName = HiveStringUtils.normalizeIdentifier(tableName); @@ -2648,37 +2750,37 @@ private Collection getPartitionPsQueryResults(String dbName, String tableName, if (part_vals.size() < numPartKeys) { partNameMatcher += ".*"; } - Query query = queryWrapper.query = pm.newQuery(MPartition.class); - StringBuilder queryFilter = new StringBuilder("table.database.name == dbName"); - queryFilter.append(" && table.tableName == tableName"); - queryFilter.append(" && partitionName.matches(partialRegex)"); - query.setFilter(queryFilter.toString()); - query.declareParameters("java.lang.String dbName, " - + "java.lang.String tableName, java.lang.String partialRegex"); - if (max_parts >= 0) { - // User specified a row limit, set it on the Query - query.setRange(0, max_parts); - } - if (resultsCol != null && !resultsCol.isEmpty()) { - query.setResult(resultsCol); - } + try (QueryWrapper query = new QueryWrapper(pm.newQuery(MPartition.class))) { + StringBuilder queryFilter = new StringBuilder("table.database.name == dbName"); + queryFilter.append(" && table.tableName == tableName"); + queryFilter.append(" && partitionName.matches(partialRegex)"); + query.setFilter(queryFilter.toString()); + query.declareParameters("java.lang.String dbName, " + + "java.lang.String tableName, java.lang.String partialRegex"); + if (max_parts >= 0) { + // User specified a row limit, set it on the Query + query.setRange(0, max_parts); + } + if (resultsCol != null && !resultsCol.isEmpty()) { + query.setResult(resultsCol); + } - return (Collection) query.execute(dbName, tableName, partNameMatcher); + return new ArrayList<>((List) query.execute(dbName, tableName, partNameMatcher)); + } } @Override public List listPartitionsPsWithAuth(String db_name, String tbl_name, List part_vals, short max_parts, String userName, List groupNames) throws MetaException, InvalidObjectException, NoSuchObjectException { - List partitions = new ArrayList(); - boolean success = false; - QueryWrapper queryWrapper = new QueryWrapper(); + List partitions = new ArrayList<>(); + boolean committed = false; try { openTransaction(); LOG.debug("executing listPartitionNamesPsWithAuth"); Collection parts = getPartitionPsQueryResults(db_name, tbl_name, - part_vals, max_parts, null, queryWrapper); + part_vals, max_parts, null); MTable mtbl = getMTable(db_name, tbl_name); for (Object o : parts) { Partition part = convertToPart((MPartition) o); @@ -2693,9 +2795,11 @@ private Collection getPartitionPsQueryResults(String dbName, String tableName, } partitions.add(part); } - success = commitTransaction(); + committed = commitTransaction(); } finally { - rollbackAndCleanup(success, queryWrapper); + if(!committed) { + rollbackTransaction(); + } } return partitions; } @@ -2704,50 +2808,44 @@ private Collection getPartitionPsQueryResults(String dbName, String tableName, public List listPartitionNamesPs(String dbName, String tableName, List part_vals, short max_parts) throws MetaException, NoSuchObjectException { List partitionNames = new ArrayList(); - boolean success = false; - QueryWrapper queryWrapper = new QueryWrapper(); + boolean committed = false; try { openTransaction(); LOG.debug("Executing listPartitionNamesPs"); Collection names = getPartitionPsQueryResults(dbName, tableName, - part_vals, max_parts, "partitionName", queryWrapper); + part_vals, max_parts, "partitionName"); for (Object o : names) { partitionNames.add((String) o); } - success = commitTransaction(); + committed = commitTransaction(); } finally { - rollbackAndCleanup(success, queryWrapper); + if(!committed) { + rollbackTransaction(); + } } return partitionNames; } // TODO:pc implement max - private List listMPartitions(String dbName, String tableName, int max, QueryWrapper queryWrapper) { - boolean success = false; - List mparts = null; - try { - openTransaction(); - LOG.debug("Executing listMPartitions"); + private List listMPartitions(String dbName, String tableName, int max) { + LOG.debug("Executing listMPartitions"); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(MPartition.class, + "table.tableName == t1 && table.database.name == t2"))) { dbName = HiveStringUtils.normalizeIdentifier(dbName); tableName = HiveStringUtils.normalizeIdentifier(tableName); - Query query = queryWrapper.query = pm.newQuery(MPartition.class, "table.tableName == t1 && table.database.name == t2"); query.declareParameters("java.lang.String t1, java.lang.String t2"); query.setOrdering("partitionName ascending"); if (max > 0) { query.setRange(0, max); } - mparts = (List) query.execute(tableName, dbName); + List mparts = (List) query.execute(tableName, dbName); LOG.debug("Done executing query for listMPartitions"); pm.retrieveAll(mparts); - success = commitTransaction(); - LOG.debug("Done retrieving all objects for listMPartitions " + mparts); - } finally { - if (!success) { - rollbackTransaction(); - } + LOG.debug("Done retrieving all objects for listMPartitions {}", mparts); + return new ArrayList<>(mparts); } - return mparts; } @Override @@ -3720,22 +3818,23 @@ private void removeUnusedColumnDescriptor(MColumnDescriptor oldCD) { return; } - boolean success = false; - QueryWrapper queryWrapper = new QueryWrapper(); + boolean committed = false; try { openTransaction(); LOG.debug("execute removeUnusedColumnDescriptor"); - List referencedSDs = listStorageDescriptorsWithCD(oldCD, 1, queryWrapper); + List referencedSDs = listStorageDescriptorsWithCD(oldCD, 1); //if no other SD references this CD, we can throw it out. if (referencedSDs != null && referencedSDs.isEmpty()) { pm.retrieve(oldCD); pm.deletePersistent(oldCD); } - success = commitTransaction(); + committed = commitTransaction(); LOG.debug("successfully deleted a CD in removeUnusedColumnDescriptor"); } finally { - rollbackAndCleanup(success, queryWrapper); + if(!committed) { + rollbackTransaction(); + } } } @@ -3760,36 +3859,27 @@ private void preDropStorageDescriptor(MStorageDescriptor msd) { /** * Get a list of storage descriptors that reference a particular Column Descriptor + * Should be always executed inside a transaction. * @param oldCD the column descriptor to get storage descriptors for * @param maxSDs the maximum number of SDs to return * @return a list of storage descriptors */ private List listStorageDescriptorsWithCD( MColumnDescriptor oldCD, - long maxSDs, - QueryWrapper queryWrapper) { - boolean success = false; - List sds = null; - try { - openTransaction(); - LOG.debug("Executing listStorageDescriptorsWithCD"); - Query query = queryWrapper.query = pm.newQuery(MStorageDescriptor.class, "this.cd == inCD"); + long maxSDs) { + LOG.debug("Executing listStorageDescriptorsWithCD"); + try (QueryWrapper query = new QueryWrapper(pm.newQuery(MStorageDescriptor.class, "this.cd == inCD"))) { query.declareParameters("MColumnDescriptor inCD"); if (maxSDs >= 0) { // User specified a row limit, set it on the Query query.setRange(0, maxSDs); } - sds = (List)query.execute(oldCD); + List sds = (List) query.execute(oldCD); LOG.debug("Done executing query for listStorageDescriptorsWithCD"); pm.retrieveAll(sds); - success = commitTransaction(); LOG.debug("Done retrieving all objects for listStorageDescriptorsWithCD"); - } finally { - if (!success) { - rollbackTransaction(); - } + return new ArrayList<>(sds); } - return sds; } private int getColumnIndexFromTableColumns(List cols, String col) { @@ -4494,8 +4584,7 @@ private MRoleMap getMSecurityUserRoleMap(String userName, PrincipalType principa @Override public boolean removeRole(String roleName) throws MetaException, NoSuchObjectException { - boolean success = false; - QueryWrapper queryWrapper = new QueryWrapper(); + boolean committed = false; try { openTransaction(); MRole mRol = getMRole(roleName); @@ -4504,60 +4593,60 @@ public boolean removeRole(String roleName) throws MetaException, // first remove all the membership, the membership that this role has // been granted List roleMap = listMRoleMembers(mRol.getRoleName()); - if (roleMap.size() > 0) { + if (!roleMap.isEmpty()) { pm.deletePersistentAll(roleMap); } List roleMember = listMSecurityPrincipalMembershipRole(mRol - .getRoleName(), PrincipalType.ROLE, queryWrapper); - if (roleMember.size() > 0) { + .getRoleName(), PrincipalType.ROLE); + if (!roleMember.isEmpty()) { pm.deletePersistentAll(roleMember); } - queryWrapper.close(); // then remove all the grants List userGrants = listPrincipalMGlobalGrants( mRol.getRoleName(), PrincipalType.ROLE); - if (userGrants.size() > 0) { + if (!userGrants.isEmpty()) { pm.deletePersistentAll(userGrants); } List dbGrants = listPrincipalAllDBGrant(mRol - .getRoleName(), PrincipalType.ROLE, queryWrapper); - if (dbGrants.size() > 0) { + .getRoleName(), PrincipalType.ROLE); + if (!dbGrants.isEmpty()) { pm.deletePersistentAll(dbGrants); } - queryWrapper.close(); + List tabPartGrants = listPrincipalAllTableGrants( - mRol.getRoleName(), PrincipalType.ROLE, queryWrapper); - if (tabPartGrants.size() > 0) { + mRol.getRoleName(), PrincipalType.ROLE); + if (!tabPartGrants.isEmpty()) { pm.deletePersistentAll(tabPartGrants); } - queryWrapper.close(); + List partGrants = listPrincipalAllPartitionGrants( - mRol.getRoleName(), PrincipalType.ROLE, queryWrapper); - if (partGrants.size() > 0) { + mRol.getRoleName(), PrincipalType.ROLE); + if (!partGrants.isEmpty()) { pm.deletePersistentAll(partGrants); } - queryWrapper.close(); + List tblColumnGrants = listPrincipalAllTableColumnGrants( - mRol.getRoleName(), PrincipalType.ROLE, queryWrapper); - if (tblColumnGrants.size() > 0) { + mRol.getRoleName(), PrincipalType.ROLE); + if (!tblColumnGrants.isEmpty()) { pm.deletePersistentAll(tblColumnGrants); } - queryWrapper.close(); + List partColumnGrants = listPrincipalAllPartitionColumnGrants( - mRol.getRoleName(), PrincipalType.ROLE, queryWrapper); - if (partColumnGrants.size() > 0) { + mRol.getRoleName(), PrincipalType.ROLE); + if (!partColumnGrants.isEmpty()) { pm.deletePersistentAll(partColumnGrants); } - queryWrapper.close(); // finally remove the role pm.deletePersistent(mRol); } - success = commitTransaction(); + committed = commitTransaction(); } finally { - rollbackAndCleanup(success, queryWrapper); + if (!committed) { + rollbackTransaction(); + } } - return success; + return committed; } /** @@ -4676,28 +4765,22 @@ private void getAllRoleAncestors(Set processedRoleNames, List return result; } + /** + * Should be always executed inside transaction. + */ @SuppressWarnings("unchecked") private List listMSecurityPrincipalMembershipRole(final String roleName, - final PrincipalType principalType, - QueryWrapper queryWrapper) { - boolean success = false; - List mRoleMemebership = null; - try { - LOG.debug("Executing listMSecurityPrincipalMembershipRole"); + final PrincipalType principalType) { + LOG.debug("Executing listMSecurityPrincipalMembershipRole"); - openTransaction(); - Query query = queryWrapper.query = pm.newQuery(MRoleMap.class, "principalName == t1 && principalType == t2"); + try(QueryWrapper query = new QueryWrapper(pm.newQuery(MRoleMap.class, + "principalName == t1 && principalType == t2"))) { query.declareParameters("java.lang.String t1, java.lang.String t2"); - mRoleMemebership = (List) query.execute(roleName, principalType.toString()); + List mRoleMemebership = (List) query.execute(roleName, principalType.toString()); pm.retrieveAll(mRoleMemebership); - success = commitTransaction(); LOG.debug("Done retrieving all objects for listMSecurityPrincipalMembershipRole"); - } finally { - if (!success) { - rollbackTransaction(); - } + return new ArrayList<>(mRoleMemebership); } - return mRoleMemebership; } @Override @@ -5737,22 +5820,12 @@ public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption) @Override public List listPrincipalDBGrantsAll( String principalName, PrincipalType principalType) { - QueryWrapper queryWrapper = new QueryWrapper(); - try { - return convertDB(listPrincipalAllDBGrant(principalName, principalType, queryWrapper)); - } finally { - queryWrapper.close(); - } + return convertDB(listPrincipalAllDBGrant(principalName, principalType)); } @Override public List listDBGrantsAll(String dbName) { - QueryWrapper queryWrapper = new QueryWrapper(); - try { - return convertDB(listDatabaseGrants(dbName, queryWrapper)); - } finally { - queryWrapper.close(); - } + return convertDB(listDatabaseGrants(dbName)); } private List convertDB(List privs) { @@ -5774,34 +5847,23 @@ public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption) @SuppressWarnings("unchecked") private List listPrincipalAllDBGrant(String principalName, - PrincipalType principalType, - QueryWrapper queryWrapper) { - boolean success = false; - Query query = null; - List mSecurityDBList = null; - try { - LOG.debug("Executing listPrincipalAllDBGrant"); - - openTransaction(); + PrincipalType principalType) { + LOG.debug("Executing listPrincipalAllDBGrant"); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(MDBPrivilege.class))) { + List mSecurityDBList; if (principalName != null && principalType != null) { - query = queryWrapper.query = pm.newQuery(MDBPrivilege.class, "principalName == t1 && principalType == t2"); + query.setFilter("principalName == t1 && principalType == t2"); query.declareParameters("java.lang.String t1, java.lang.String t2"); mSecurityDBList = (List) query.execute(principalName, principalType.toString()); } else { - query = queryWrapper.query = pm.newQuery(MDBPrivilege.class); mSecurityDBList = (List) query.execute(); } pm.retrieveAll(mSecurityDBList); - success = commitTransaction(); - LOG.debug("Done retrieving all objects for listPrincipalAllDBGrant"); - } finally { - if (!success) { - rollbackTransaction(); - } + return new ArrayList<>(mSecurityDBList); } - return mSecurityDBList; } @SuppressWarnings("unchecked") @@ -5953,24 +6015,17 @@ public void dropPartitionAllColumnGrantsNoTxn( } @SuppressWarnings("unchecked") - private List listDatabaseGrants(String dbName, QueryWrapper queryWrapper) { + private List listDatabaseGrants(String dbName) { dbName = HiveStringUtils.normalizeIdentifier(dbName); - boolean success = false; - try { - LOG.debug("Executing listDatabaseGrants"); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(MDBPrivilege.class, + "database.name == t1"))) { - openTransaction(); - Query query = queryWrapper.query = pm.newQuery(MDBPrivilege.class, "database.name == t1"); query.declareParameters("java.lang.String t1"); List mSecurityDBList = (List) query.executeWithArray(dbName); pm.retrieveAll(mSecurityDBList); - success = commitTransaction(); LOG.debug("Done retrieving all objects for listDatabaseGrants"); - return mSecurityDBList; - } finally { - if (!success) { - rollbackTransaction(); - } + return new ArrayList<>(mSecurityDBList); } } @@ -5980,24 +6035,16 @@ public void dropPartitionAllColumnGrantsNoTxn( tableName = HiveStringUtils.normalizeIdentifier(tableName); dbName = HiveStringUtils.normalizeIdentifier(dbName); - boolean success = false; - List mSecurityTabPartList = null; - try { - openTransaction(); + try (THandler th = new THandler()) { LOG.debug("Executing listPartitionGrants"); - mSecurityTabPartList = queryByPartitionNames( + List mSecurityTabPartList = queryByPartitionNames( dbName, tableName, partNames, MPartitionPrivilege.class, "partition.table.tableName", "partition.table.database.name", "partition.partitionName"); LOG.debug("Done executing query for listPartitionGrants"); pm.retrieveAll(mSecurityTabPartList); - success = commitTransaction(); LOG.debug("Done retrieving all objects for listPartitionGrants"); - } finally { - if (!success) { - rollbackTransaction(); - } + return mSecurityTabPartList; } - return mSecurityTabPartList; } private void dropPartitionGrantsNoTxn(String dbName, String tableName, List partNames) { @@ -6363,30 +6410,24 @@ private void dropPartitionGrantsNoTxn(String dbName, String tableName, List listPrincipalAllTableGrants( - String principalName, PrincipalType principalType, QueryWrapper queryWrapper) { - boolean success = false; - List mSecurityTabPartList = null; - try { - LOG.debug("Executing listPrincipalAllTableGrants"); + String principalName, PrincipalType principalType) { + LOG.debug("Executing listPrincipalAllTableGrants"); - openTransaction(); - Query query = queryWrapper.query = pm.newQuery(MTablePrivilege.class, - "principalName == t1 && principalType == t2"); + try (QueryWrapper query = new QueryWrapper(pm.newQuery(MTablePrivilege.class, + "principalName == t1 && principalType == t2"))) { query.declareParameters("java.lang.String t1, java.lang.String t2"); - mSecurityTabPartList = (List) query.execute( - principalName, principalType.toString()); + List mSecurityTabPartList = + (List) query.execute(principalName, principalType.toString()); pm.retrieveAll(mSecurityTabPartList); - success = commitTransaction(); LOG.debug("Done retrieving all objects for listPrincipalAllTableGrants"); - } finally { - if (!success) { - rollbackTransaction(); - } + return new ArrayList<>(mSecurityTabPartList); } - return mSecurityTabPartList; } @Override @@ -6462,27 +6503,22 @@ private void dropPartitionGrantsNoTxn(String dbName, String tableName, List listPrincipalAllPartitionGrants(String principalName, - PrincipalType principalType, QueryWrapper queryWrapper) { - boolean success = false; - List mSecurityTabPartList = null; - try { - openTransaction(); - LOG.debug("Executing listPrincipalAllPartitionGrants"); - Query query = queryWrapper.query = pm.newQuery(MPartitionPrivilege.class, "principalName == t1 && principalType == t2"); + PrincipalType principalType) { + LOG.debug("Executing listPrincipalAllPartitionGrants"); + try (QueryWrapper query = new QueryWrapper(pm.newQuery(MPartitionPrivilege.class, + "principalName == t1 && principalType == t2"))) { query.declareParameters("java.lang.String t1, java.lang.String t2"); - mSecurityTabPartList = + List mSecurityTabPartList = (List) query.execute(principalName, principalType.toString()); pm.retrieveAll(mSecurityTabPartList); - success = commitTransaction(); LOG.debug("Done retrieving all objects for listPrincipalAllPartitionGrants"); - } finally { - if (!success) { - rollbackTransaction(); - } + return new ArrayList<>(mSecurityTabPartList); } - return mSecurityTabPartList; } @Override @@ -6561,31 +6597,24 @@ private void dropPartitionGrantsNoTxn(String dbName, String tableName, List listPrincipalAllTableColumnGrants(String principalName, - PrincipalType principalType, QueryWrapper queryWrapper) { - boolean success = false; - - List mSecurityColumnList = null; - try { - LOG.debug("Executing listPrincipalAllTableColumnGrants"); + PrincipalType principalType) { + LOG.debug("Executing listPrincipalAllTableColumnGrants"); - openTransaction(); - Query query = queryWrapper.query = - pm.newQuery(MTableColumnPrivilege.class, "principalName == t1 && principalType == t2"); + try (QueryWrapper query = new QueryWrapper(pm.newQuery(MTableColumnPrivilege.class, + "principalName == t1 && principalType == t2"))) { query.declareParameters("java.lang.String t1, java.lang.String t2"); - mSecurityColumnList = + List mSecurityColumnList = (List) query.execute(principalName, principalType.toString()); pm.retrieveAll(mSecurityColumnList); - success = commitTransaction(); LOG.debug("Done retrieving all objects for listPrincipalAllTableColumnGrants"); - } finally { - if (!success) { - rollbackTransaction(); - } + return new ArrayList<>(mSecurityColumnList); } - return mSecurityColumnList; } @Override @@ -6665,30 +6694,24 @@ private void dropPartitionGrantsNoTxn(String dbName, String tableName, List listPrincipalAllPartitionColumnGrants( - String principalName, PrincipalType principalType, QueryWrapper queryWrapper) { - boolean success = false; - List mSecurityColumnList = null; - try { - LOG.debug("Executing listPrincipalAllTableColumnGrants"); + String principalName, PrincipalType principalType) { + LOG.debug("Executing listPrincipalAllTableColumnGrants"); - openTransaction(); - Query query = queryWrapper.query = - pm.newQuery(MPartitionColumnPrivilege.class, "principalName == t1 && principalType == t2"); + try (QueryWrapper query = new QueryWrapper(pm.newQuery(MPartitionColumnPrivilege.class, + "principalName == t1 && principalType == t2"))) { query.declareParameters("java.lang.String t1, java.lang.String t2"); - mSecurityColumnList = + List mSecurityColumnList = (List) query.execute(principalName, principalType.toString()); pm.retrieveAll(mSecurityColumnList); - success = commitTransaction(); LOG.debug("Done retrieving all objects for listPrincipalAllTableColumnGrants"); - } finally { - if (!success) { - rollbackTransaction(); - } + return new ArrayList<>(mSecurityColumnList); } - return mSecurityColumnList; } @Override @@ -6770,24 +6793,11 @@ private String getPartitionStr(Table tbl, Map partName) throws In * is used by HiveMetaTool. This API **shouldn't** be exposed via Thrift. * */ - public Collection executeJDOQLSelect(String queryStr, QueryWrapper queryWrapper) { - boolean committed = false; - Collection result = null; - try { - openTransaction(); - Query query = queryWrapper.query = pm.newQuery(queryStr); - result = ((Collection) query.execute()); - committed = commitTransaction(); - - if (committed) { - return result; - } else { - return null; - } - } finally { - if (!committed) { - rollbackTransaction(); - } + public Collection executeJDOQLSelect(String queryStr) { + try (THandler h = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(queryStr))) { + List result = ((List) query.execute()); + return new ArrayList<>(result); } } @@ -6799,21 +6809,9 @@ private String getPartitionStr(Table tbl, Map partName) throws In * */ public long executeJDOQLUpdate(String queryStr) { - boolean committed = false; - long numUpdated = 0; - Query query = null; - try { - openTransaction(); - query = pm.newQuery(queryStr); - numUpdated = (Long) query.execute(); - committed = commitTransaction(); - if (committed) { - return numUpdated; - } else { - return -1; - } - } finally { - rollbackAndCleanup(committed, query); + try (THandler h = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(queryStr))) { + return (Long) query.execute(); } } @@ -6825,25 +6823,15 @@ public long executeJDOQLUpdate(String queryStr) { * */ public Set listFSRoots() { - boolean committed = false; - Query query = null; - Set fsRoots = new HashSet(); - try { - openTransaction(); - query = pm.newQuery(MDatabase.class); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(MDatabase.class))) { List mDBs = (List) query.execute(); pm.retrieveAll(mDBs); + Set fsRoots = new HashSet(mDBs.size()); for (MDatabase mDB : mDBs) { fsRoots.add(mDB.getLocationUri()); } - committed = commitTransaction(); - if (committed) { - return fsRoots; - } else { - return null; - } - } finally { - rollbackAndCleanup(committed, query); + return fsRoots; } } @@ -7254,20 +7242,15 @@ private void writeMTableColumnStatistics(Table table, MTableColumnStatistics mSt String dbName = mStatsObj.getDbName(); String tableName = mStatsObj.getTableName(); String colName = mStatsObj.getColName(); - QueryWrapper queryWrapper = new QueryWrapper(); - try { - LOG.info("Updating table level column statistics for db=" + dbName + " tableName=" + tableName - + " colName=" + colName); - validateTableCols(table, Lists.newArrayList(colName)); + LOG.info("Updating table level column statistics for db=" + dbName + " tableName=" + tableName + + " colName=" + colName); + validateTableCols(table, Lists.newArrayList(colName)); - if (oldStats != null) { - StatObjectConverter.setFieldsIntoOldStats(mStatsObj, oldStats); - } else { - pm.makePersistent(mStatsObj); - } - } finally { - queryWrapper.close(); + if (oldStats != null) { + StatObjectConverter.setFieldsIntoOldStats(mStatsObj, oldStats); + } else { + pm.makePersistent(mStatsObj); } } @@ -7292,18 +7275,13 @@ private void writeMPartitionColumnStatistics(Table table, Partition partition, } if (!foundCol) { - LOG.warn("Column " + colName + " for which stats gathering is requested doesn't exist."); + LOG.warn("Column {} for which stats gathering is requested doesn't exist.", colName); } - QueryWrapper queryWrapper = new QueryWrapper(); - try { - if (oldStats != null) { - StatObjectConverter.setFieldsIntoOldStats(mStatsObj, oldStats); - } else { - pm.makePersistent(mStatsObj); - } - } finally { - queryWrapper.close(); + if (oldStats != null) { + StatObjectConverter.setFieldsIntoOldStats(mStatsObj, oldStats); + } else { + pm.makePersistent(mStatsObj); } } @@ -7318,16 +7296,14 @@ private void writeMPartitionColumnStatistics(Table table, Partition partition, */ private Map getPartitionColStats(Table table, List colNames) throws NoSuchObjectException, MetaException { - Map statsMap = Maps.newHashMap(); - QueryWrapper queryWrapper = new QueryWrapper(); - try { - List stats = getMTableColumnStatistics(table, - colNames, queryWrapper); - for(MTableColumnStatistics cStat : stats) { - statsMap.put(cStat.getColName(), cStat); - } - } finally { - queryWrapper.close(); + List stats = getMTableColumnStatistics(table, + colNames); + if (stats == null) { + return Collections.emptyMap(); + } + Map statsMap = new HashMap<>(stats.size()); + for(MTableColumnStatistics cStat : stats) { + statsMap.put(cStat.getColName(), cStat); } return statsMap; } @@ -7389,16 +7365,11 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats) */ private Map getPartitionColStats(Table table, String partitionName, List colNames) throws NoSuchObjectException, MetaException { - Map statsMap = Maps.newHashMap(); - QueryWrapper queryWrapper = new QueryWrapper(); - try { - List stats = getMPartitionColumnStatistics(table, - Lists.newArrayList(partitionName), colNames, queryWrapper); - for(MPartitionColumnStatistics cStat : stats) { - statsMap.put(cStat.getColName(), cStat); - } - } finally { - queryWrapper.close(); + List stats = getMPartitionColumnStatistics(table, + Lists.newArrayList(partitionName), colNames); + Map statsMap = new HashMap<>(stats.size()); + for(MPartitionColumnStatistics cStat : stats) { + statsMap.put(cStat.getColName(), cStat); } return statsMap; } @@ -7448,30 +7419,28 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List getMTableColumnStatistics(Table table, List colNames, QueryWrapper queryWrapper) + private List getMTableColumnStatistics(Table table, List colNames) throws MetaException { if (colNames == null || colNames.isEmpty()) { return null; } - boolean committed = false; - try { - openTransaction(); - + validateTableCols(table, colNames); + String filter = "tableName == t1 && dbName == t2 && ("; + String paramStr = "java.lang.String t1, java.lang.String t2"; + Object[] params = new Object[colNames.size() + 2]; + params[0] = table.getTableName(); + params[1] = table.getDbName(); + for (int i = 0; i < colNames.size(); ++i) { + filter += ((i == 0) ? "" : " || ") + "colName == c" + i; + paramStr += ", java.lang.String c" + i; + params[i + 2] = colNames.get(i); + } + filter += ")"; + + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(MTableColumnStatistics.class))) { List result = null; - validateTableCols(table, colNames); - Query query = queryWrapper.query = pm.newQuery(MTableColumnStatistics.class); - String filter = "tableName == t1 && dbName == t2 && ("; - String paramStr = "java.lang.String t1, java.lang.String t2"; - Object[] params = new Object[colNames.size() + 2]; - params[0] = table.getTableName(); - params[1] = table.getDbName(); - for (int i = 0; i < colNames.size(); ++i) { - filter += ((i == 0) ? "" : " || ") + "colName == c" + i; - paramStr += ", java.lang.String c" + i; - params[i + 2] = colNames.get(i); - } - filter += ")"; query.setFilter(filter); query.declareParameters(paramStr); result = (List) query.executeWithArray(params); @@ -7480,18 +7449,10 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List(result); } catch (Exception ex) { LOG.error("Error retrieving statistics via jdo", ex); - if (ex instanceof MetaException) { - throw (MetaException) ex; - } throw new MetaException(ex.getMessage()); - } finally { - if (!committed) { - rollbackTransaction(); - } } } @@ -7533,15 +7494,13 @@ protected ColumnStatistics getSqlResult(GetHelper ctx) throws @Override protected ColumnStatistics getJdoResult( GetHelper ctx) throws MetaException { - QueryWrapper queryWrapper = new QueryWrapper(); - try { - List mStats = getMTableColumnStatistics(getTable(), colNames, queryWrapper); - if (mStats.isEmpty()) return null; + List mStats = getMTableColumnStatistics(getTable(), colNames); + if (mStats == null || mStats.isEmpty()) return null; // LastAnalyzed is stored per column, but thrift object has it per multiple columns. // Luckily, nobody actually uses it, so we will set to lowest value of all columns for now. ColumnStatisticsDesc desc = StatObjectConverter.getTableColumnStatisticsDesc(mStats.get(0)); - List statObjs = new ArrayList(mStats.size()); + List statObjs = new ArrayList<>(mStats.size()); for (MTableColumnStatistics mStat : mStats) { if (desc.getLastAnalyzed() > mStat.getLastAnalyzed()) { desc.setLastAnalyzed(mStat.getLastAnalyzed()); @@ -7550,9 +7509,6 @@ protected ColumnStatistics getJdoResult( Deadline.checkTimeout(); } return new ColumnStatistics(desc, statObjs); - } finally { - queryWrapper.close(); - } } }.run(true); } @@ -7578,11 +7534,9 @@ protected ColumnStatistics getJdoResult( @Override protected List getJdoResult( GetHelper> ctx) throws MetaException, NoSuchObjectException { - QueryWrapper queryWrapper = new QueryWrapper(); - try { List mStats = - getMPartitionColumnStatistics(getTable(), partNames, colNames, queryWrapper); - List result = new ArrayList( + getMPartitionColumnStatistics(getTable(), partNames, colNames); + List result = new ArrayList<>( Math.min(mStats.size(), partNames.size())); String lastPartName = null; List curList = null; @@ -7599,16 +7553,13 @@ protected ColumnStatistics getJdoResult( continue; } csd = StatObjectConverter.getPartitionColumnStatisticsDesc(mStatsObj); - curList = new ArrayList(colNames.size()); + curList = new ArrayList<>(colNames.size()); } curList.add(StatObjectConverter.getPartitionColumnStatisticsObj(mStatsObj, enableBitVector)); lastPartName = partName; Deadline.checkTimeout(); } return result; - } finally { - queryWrapper.close(); - } } }.run(true); } @@ -7680,40 +7631,40 @@ public void flushCache() { } private List getMPartitionColumnStatistics( - Table table, List partNames, List colNames, QueryWrapper queryWrapper) + Table table, List partNames, List colNames) throws NoSuchObjectException, MetaException { - boolean committed = false; + // We are not going to verify SD for each partition. Just verify for the table. + // ToDo: we need verify the partition column instead try { - openTransaction(); - // We are not going to verify SD for each partition. Just verify for the table. - // ToDo: we need verify the partition column instead - try { - validateTableCols(table, colNames); - } catch (MetaException me) { - LOG.warn("The table does not have the same column definition as its partition."); - } - Query query = queryWrapper.query = pm.newQuery(MPartitionColumnStatistics.class); - String paramStr = "java.lang.String t1, java.lang.String t2"; - String filter = "tableName == t1 && dbName == t2 && ("; - Object[] params = new Object[colNames.size() + partNames.size() + 2]; - int i = 0; - params[i++] = table.getTableName(); - params[i++] = table.getDbName(); - int firstI = i; - for (String s : partNames) { - filter += ((i == firstI) ? "" : " || ") + "partitionName == p" + i; - paramStr += ", java.lang.String p" + i; - params[i++] = s; - } - filter += ") && ("; - firstI = i; - for (String s : colNames) { - filter += ((i == firstI) ? "" : " || ") + "colName == c" + i; - paramStr += ", java.lang.String c" + i; - params[i++] = s; - } - filter += ")"; + validateTableCols(table, colNames); + } catch (MetaException me) { + LOG.warn("The table does not have the same column definition as its partition."); + } + + String paramStr = "java.lang.String t1, java.lang.String t2"; + String filter = "tableName == t1 && dbName == t2 && ("; + Object[] params = new Object[colNames.size() + partNames.size() + 2]; + int i = 0; + params[i++] = table.getTableName(); + params[i++] = table.getDbName(); + int firstI = i; + for (String s : partNames) { + filter += ((i == firstI) ? "" : " || ") + "partitionName == p" + i; + paramStr += ", java.lang.String p" + i; + params[i++] = s; + } + filter += ") && ("; + firstI = i; + for (String s : colNames) { + filter += ((i == firstI) ? "" : " || ") + "colName == c" + i; + paramStr += ", java.lang.String c" + i; + params[i++] = s; + } + filter += ")"; + + try(THandler th = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(MPartitionColumnStatistics.class))) { query.setFilter(filter); query.declareParameters(paramStr); query.setOrdering("partitionName ascending"); @@ -7721,19 +7672,9 @@ public void flushCache() { List result = (List) query.executeWithArray(params); pm.retrieveAll(result); - committed = commitTransaction(); - return result; + return new ArrayList<>(result); } catch (Exception ex) { - LOG.error("Error retrieving statistics via jdo", ex); - if (ex instanceof MetaException) { - throw (MetaException) ex; - } throw new MetaException(ex.getMessage()); - } finally { - if (!committed) { - rollbackTransaction(); - return Lists.newArrayList(); - } } } @@ -7896,126 +7837,86 @@ public boolean deleteTableColumnStatistics(String dbName, String tableName, Stri @Override public long cleanupEvents() { - boolean commited = false; - Query query = null; - long delCnt; LOG.debug("Begin executing cleanupEvents"); Long expiryTime = HiveConf.getTimeVar(getConf(), ConfVars.METASTORE_EVENT_EXPIRY_DURATION, TimeUnit.MILLISECONDS); Long curTime = System.currentTimeMillis(); - try { - openTransaction(); - query = pm.newQuery(MPartitionEvent.class, "curTime - eventTime > expiryTime"); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(MPartitionEvent.class, + "curTime - eventTime > expiryTime"))) { query.declareParameters("java.lang.Long curTime, java.lang.Long expiryTime"); - delCnt = query.deletePersistentAll(curTime, expiryTime); - commited = commitTransaction(); - } finally { - rollbackAndCleanup(commited, query); + long delCnt = query.deletePersistentAll(curTime, expiryTime); LOG.debug("Done executing cleanupEvents"); + return delCnt; } - return delCnt; } private MDelegationToken getTokenFrom(String tokenId) { - Query query = pm.newQuery(MDelegationToken.class, "tokenIdentifier == tokenId"); - query.declareParameters("java.lang.String tokenId"); - query.setUnique(true); - MDelegationToken delegationToken = (MDelegationToken) query.execute(tokenId); - if (query != null) { - query.closeAll(); + try (QueryWrapper query = new QueryWrapper(pm.newQuery(MDelegationToken.class, + "tokenIdentifier == tokenId"))) { + query.declareParameters("java.lang.String tokenId"); + query.setUnique(true); + return (MDelegationToken) query.execute(tokenId); } - return delegationToken; } @Override public boolean addToken(String tokenId, String delegationToken) { - LOG.debug("Begin executing addToken"); - boolean committed = false; - MDelegationToken token; - try{ - openTransaction(); - token = getTokenFrom(tokenId); + try (THandler th = new THandler()) { + MDelegationToken token = getTokenFrom(tokenId); if (token == null) { // add Token, only if it already doesn't exist pm.makePersistent(new MDelegationToken(tokenId, delegationToken)); } - committed = commitTransaction(); - } finally { - if(!committed) { - rollbackTransaction(); - } + LOG.debug("Done executing addToken"); + return token == null; } - LOG.debug("Done executing addToken with status : " + committed); - return committed && (token == null); } @Override public boolean removeToken(String tokenId) { - LOG.debug("Begin executing removeToken"); - boolean committed = false; - MDelegationToken token; - try{ - openTransaction(); - token = getTokenFrom(tokenId); - if (null != token) { + try (THandler th = new THandler()) { + MDelegationToken token = getTokenFrom(tokenId); + if (token != null) { pm.deletePersistent(token); } - committed = commitTransaction(); - } finally { - if(!committed) { - rollbackTransaction(); - } + LOG.debug("Done executing removeToken"); + return token != null; } - LOG.debug("Done executing removeToken with status : " + committed); - return committed && (token != null); } @Override public String getToken(String tokenId) { - LOG.debug("Begin executing getToken"); - boolean committed = false; - MDelegationToken token; - try{ - openTransaction(); - token = getTokenFrom(tokenId); - if (null != token) { + try (THandler th = new THandler()) { + MDelegationToken token = getTokenFrom(tokenId); + if (token != null) { pm.retrieve(token); } - committed = commitTransaction(); - } finally { - if(!committed) { - rollbackTransaction(); - } + LOG.debug("Done executing getToken"); + return token != null ? token.getTokenStr() : null; } - LOG.debug("Done executing getToken with status : " + committed); - return (null == token) ? null : token.getTokenStr(); } @Override public List getAllTokenIdentifiers() { LOG.debug("Begin executing getAllTokenIdentifiers"); - boolean committed = false; - Query query = null; - List tokenIdents = new ArrayList(); - try { - openTransaction(); - query = pm.newQuery(MDelegationToken.class); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper(pm.newQuery(MDelegationToken.class))) { List tokens = (List) query.execute(); + if (tokens.isEmpty()) { + return Collections.emptyList(); + } pm.retrieveAll(tokens); - committed = commitTransaction(); - + List tokenIdents = new ArrayList<>(tokens.size()); for (MDelegationToken token : tokens) { tokenIdents.add(token.getTokenIdentifier()); } return tokenIdents; - } finally { - LOG.debug("Done executing getAllTokenIdentifers with status : " + committed); - rollbackAndCleanup(committed, query); } } @@ -8624,16 +8525,13 @@ public long getSleepInterval() { @Override public void addNotificationEvent(NotificationEvent entry) { - boolean commited = false; - Query query = null; - try { - openTransaction(); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper( pm.newQuery(MNotificationNextId.class))) { lockForUpdate(); - Query objectQuery = pm.newQuery(MNotificationNextId.class); - Collection ids = (Collection) objectQuery.execute(); + List ids = (List) query.execute(); MNotificationNextId mNotificationNextId = null; boolean needToPersistId; - if (ids == null || ids.size() == 0) { + if (ids.isEmpty()) { mNotificationNextId = new MNotificationNextId(1L); needToPersistId = true; } else { @@ -8646,71 +8544,54 @@ public void addNotificationEvent(NotificationEvent entry) { pm.makePersistent(mNotificationNextId); } pm.makePersistent(translateThriftToDb(entry)); - commited = commitTransaction(); } catch (Exception e) { LOG.error("couldnot get lock for update", e); - } finally { - rollbackAndCleanup(commited, query); } } @Override public void cleanNotificationEvents(int olderThan) { - boolean commited = false; - Query query = null; - try { - openTransaction(); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper( pm.newQuery(MNotificationLog.class))) { + query.setFilter("eventTime < tooOld"); + query.declareParameters("java.lang.Integer tooOld"); long tmp = System.currentTimeMillis() / 1000 - olderThan; int tooOld = (tmp > Integer.MAX_VALUE) ? 0 : (int) tmp; - query = pm.newQuery(MNotificationLog.class, "eventTime < tooOld"); - query.declareParameters("java.lang.Integer tooOld"); - Collection toBeRemoved = (Collection) query.execute(tooOld); - if (toBeRemoved != null && toBeRemoved.size() > 0) { + List toBeRemoved = (List) query.execute(tooOld); + if (!toBeRemoved.isEmpty()) { pm.deletePersistentAll(toBeRemoved); } - commited = commitTransaction(); - } finally { - rollbackAndCleanup(commited, query); } } @Override public CurrentNotificationEventId getCurrentNotificationEventId() { - boolean commited = false; - Query query = null; - try { - openTransaction(); - query = pm.newQuery(MNotificationNextId.class); - Collection ids = (Collection) query.execute(); + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper( pm.newQuery(MNotificationNextId.class))) { + List ids = (List) query.execute(); long id = 0; - if (ids != null && ids.size() > 0) { + if (!ids.isEmpty()) { id = ids.iterator().next().getNextEventId() - 1; } - commited = commitTransaction(); return new CurrentNotificationEventId(id); - } finally { - rollbackAndCleanup(commited, query); } } @Override public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) { - Long result = 0L; - boolean commited = false; - Query query = null; - try { + String queryStr = "select count(eventId) from " + MNotificationLog.class.getName() + + " where eventId > fromEventId && dbName == inputDbName"; + try (THandler th = new THandler(); + QueryWrapper query = new QueryWrapper( pm.newQuery(queryStr))) { openTransaction(); long fromEventId = rqst.getFromEventId(); String inputDbName = rqst.getDbName(); - String queryStr = "select count(eventId) from " + MNotificationLog.class.getName() - + " where eventId > fromEventId && dbName == inputDbName"; - query = pm.newQuery(queryStr); query.declareParameters("java.lang.Long fromEventId, java.lang.String inputDbName"); - result = (Long) query.execute(fromEventId, inputDbName); - commited = commitTransaction(); - return new NotificationEventsCountResponse(result.longValue()); - } finally { - rollbackAndCleanup(commited, query); + Long result = (Long) query.execute(fromEventId, inputDbName); + if (result == null) { + result = 0L; + } + return new NotificationEventsCountResponse(result); } } @@ -9272,26 +9153,6 @@ void rollbackAndCleanup(boolean success, Query query) { } } - /** - * This is a cleanup method which is used to rollback a active transaction - * if the success flag is false and close the associated QueryWrapper object. This method is used - * internally and visible for testing purposes only - * @param success Rollback the current active transaction if false - * @param queryWrapper QueryWrapper object which needs to be closed - */ - @VisibleForTesting - void rollbackAndCleanup(boolean success, QueryWrapper queryWrapper) { - try { - if (!success) { - rollbackTransaction(); - } - } finally { - if (queryWrapper != null) { - queryWrapper.close(); - } - } - } - /** * To make possible to run multiple metastore in unit test * @param twoMetastoreTesting if we are using multiple metastore in unit test diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java b/metastore/src/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java index 22e246f1c9..e8393b66ec 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java @@ -149,21 +149,14 @@ private void executeJDOQLSelect(String query) { initObjectStore(hiveConf); System.out.println("Executing query: " + query); - ObjectStore.QueryWrapper queryWrapper = new ObjectStore.QueryWrapper(); - try { - Collection result = objStore.executeJDOQLSelect(query, queryWrapper); - if (result != null) { - Iterator iter = result.iterator(); - while (iter.hasNext()) { - Object o = iter.next(); - System.out.println(o.toString()); - } - } else { - System.err.println("Encountered error during executeJDOQLSelect -" + - "commit of JDO transaction failed."); + Collection result = objStore.executeJDOQLSelect(query); + if (result != null) { + for (Object o : result) { + System.out.println(o.toString()); } - } finally { - queryWrapper.close(); + } else { + System.err.println("Encountered error during executeJDOQLSelect -" + + "commit of JDO transaction failed."); } } -- 2.14.2