From 85e4cdf5fc8b4b28ecbf052b33fa55c1ad55781f Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Thu, 21 Jan 2016 16:56:30 -0800 Subject: [PATCH] HIVE-12907 : Improve dynamic partition loading - II --- .../apache/hadoop/hive/common/StatsSetupConst.java | 6 +- .../listener/TestDbNotificationListener.java | 30 ++++----- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 73 ++++++++++++++-------- .../apache/hadoop/hive/ql/metadata/Partition.java | 3 +- 4 files changed, 66 insertions(+), 46 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java b/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java index 029d415..c9ef647 100644 --- a/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java +++ b/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java @@ -192,7 +192,7 @@ public static boolean areColumnStatsUptoDate(Map params, String // note that set basic stats false will wipe out column stats too. public static void setBasicStatsState(Map params, String setting) { if (setting.equals(FALSE)) { - if (params.containsKey(COLUMN_STATS_ACCURATE)) { + if (params != null && params.containsKey(COLUMN_STATS_ACCURATE)) { params.remove(COLUMN_STATS_ACCURATE); } } else { @@ -299,8 +299,8 @@ public static void setColumnStatsState(Map params, List } public static void clearColumnStatsState(Map params) { - String statsAcc = params.get(COLUMN_STATS_ACCURATE); - if (statsAcc != null) { + String statsAcc; + if (params != null && (statsAcc = params.get(COLUMN_STATS_ACCURATE)) != null) { // statsAcc may not be jason format, which will throw exception JSONObject stats; try { diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 56f5c8b..36b624e 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -534,35 +534,35 @@ public void sqlInsertPartition() throws Exception { for (NotificationEvent ne : rsp.getEvents()) LOG.debug("EVENT: " + ne.getMessage()); // For reasons not clear to me there's one or more alter partitions after add partition and // insert. - assertEquals(25, rsp.getEventsSize()); + assertEquals(19, rsp.getEventsSize()); NotificationEvent event = rsp.getEvents().get(1); assertEquals(firstEventId + 2, event.getEventId()); assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); - event = rsp.getEvents().get(5); - assertEquals(firstEventId + 6, event.getEventId()); + event = rsp.getEvents().get(3); + assertEquals(firstEventId + 4, event.getEventId()); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); // Make sure the files are listed in the insert assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*")); - event = rsp.getEvents().get(9); - assertEquals(firstEventId + 10, event.getEventId()); + event = rsp.getEvents().get(6); + assertEquals(firstEventId + 7, event.getEventId()); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*")); - event = rsp.getEvents().get(12); - assertEquals(firstEventId + 13, event.getEventId()); + event = rsp.getEvents().get(9); + assertEquals(firstEventId + 10, event.getEventId()); assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); - event = rsp.getEvents().get(14); - assertEquals(firstEventId + 15, event.getEventId()); + event = rsp.getEvents().get(10); + assertEquals(firstEventId + 11, event.getEventId()); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*")); - event = rsp.getEvents().get(18); - assertEquals(firstEventId + 19, event.getEventId()); + event = rsp.getEvents().get(13); + assertEquals(firstEventId + 14, event.getEventId()); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*")); - event = rsp.getEvents().get(21); - assertEquals(firstEventId + 22, event.getEventId()); + event = rsp.getEvents().get(16); + assertEquals(firstEventId + 17, event.getEventId()); assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); - event = rsp.getEvents().get(24); - assertEquals(firstEventId + 25, event.getEventId()); + event = rsp.getEvents().get(18); + assertEquals(firstEventId + 19, event.getEventId()); assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index efb50b2..a118b59 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hive.metastore.api.HiveObjectType; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -631,11 +632,7 @@ public void alterPartition(String tblName, Partition newPart) public void alterPartition(String dbName, String tblName, Partition newPart) throws InvalidOperationException, HiveException { try { - // Remove the DDL time so that it gets refreshed - if (newPart.getParameters() != null) { - newPart.getParameters().remove(hive_metastoreConstants.DDL_TIME); - } - newPart.checkValidity(); + validatePartition(newPart); getMSC().alter_partition(dbName, tblName, newPart.getTPartition()); } catch (MetaException e) { @@ -645,6 +642,14 @@ public void alterPartition(String dbName, String tblName, Partition newPart) } } + private void validatePartition(Partition newPart) throws HiveException { + // Remove the DDL time so that it gets refreshed + if (newPart.getParameters() != null) { + newPart.getParameters().remove(hive_metastoreConstants.DDL_TIME); + } + newPart.checkValidity(); + } + /** * Updates the existing table metadata with the new metadata. * @@ -1427,14 +1432,13 @@ public void loadPartition(Path loadPath, String tableName, * @param isSrcLocal * If the source directory is LOCAL * @param isAcid true if this is an ACID operation - * @throws JSONException + * @throws JSONException */ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); - Partition newTPart = null; try { /** * Move files before creating the partition since down stream processes @@ -1474,19 +1478,25 @@ public Partition loadPartition(Path loadPath, Table tbl, } else { newPartPath = oldPartPath; } - List newFiles = null; if (replace) { Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), isSrcLocal); } else { - newFiles = new ArrayList(); + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { + newFiles = new ArrayList<>(); + } + FileSystem fs = tbl.getDataLocation().getFileSystem(conf); Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); } + Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); + alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); + validatePartition(newTPart); + if (oldPart != null && null != newFiles) { + fireInsertEvent(tbl, partSpec, newFiles); + } - newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), - inheritTableSpecs, newFiles); //column stats will be inaccurate StatsSetupConst.clearColumnStatsState(newTPart.getParameters()); @@ -1500,18 +1510,18 @@ public Partition loadPartition(Path loadPath, Table tbl, /* Add list bucketing location mappings. */ skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); newCreatedTpart.getSd().setSkewedInfo(skewedInfo); - if(!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { - StatsSetupConst.setBasicStatsState(newTPart.getParameters(), StatsSetupConst.FALSE); - } - alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart)); - newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs, - newFiles); - return new Partition(tbl, newCreatedTpart); } if(!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { StatsSetupConst.setBasicStatsState(newTPart.getParameters(), StatsSetupConst.FALSE); } - alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newTPart.getTPartition())); + if (oldPart == null) { + newTPart.getTPartition().setParameters(new HashMap()); + MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters()); + getMSC().add_partition(newTPart.getTPartition()); + } else { + alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newTPart.getTPartition())); + } + return newTPart; } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); @@ -1521,8 +1531,10 @@ public Partition loadPartition(Path loadPath, Table tbl, } catch (InvalidOperationException e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); + } catch (TException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); } - return newTPart; } /** @@ -1622,7 +1634,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param txnId txnId, can be 0 unless isAcid == true * @return partition map details (PartitionSpec and Partition) * @throws HiveException - * @throws JSONException + * @throws JSONException */ public Map, Partition> loadDynamicPartitions(Path loadPath, String tableName, Map partSpec, boolean replace, @@ -1991,6 +2003,20 @@ private void alterPartitionSpec(Table tbl, org.apache.hadoop.hive.metastore.api.Partition tpart, boolean inheritTableSpecs, String partPath) throws HiveException, InvalidOperationException { + + alterPartitionSpecInMemory(tbl, partSpec, tpart, inheritTableSpecs, partPath); + String fullName = tbl.getTableName(); + if (!org.apache.commons.lang.StringUtils.isEmpty(tbl.getDbName())) { + fullName = tbl.getDbName() + "." + tbl.getTableName(); + } + alterPartition(fullName, new Partition(tbl, tpart)); + } + + private void alterPartitionSpecInMemory(Table tbl, + Map partSpec, + org.apache.hadoop.hive.metastore.api.Partition tpart, + boolean inheritTableSpecs, + String partPath) throws HiveException, InvalidOperationException { LOG.debug("altering partition for table " + tbl.getTableName() + " with partition spec : " + partSpec); if (inheritTableSpecs) { @@ -2007,11 +2033,6 @@ private void alterPartitionSpec(Table tbl, throw new HiveException("new partition path should not be null or empty."); } tpart.getSd().setLocation(partPath); - String fullName = tbl.getTableName(); - if (!org.apache.commons.lang.StringUtils.isEmpty(tbl.getDbName())) { - fullName = tbl.getDbName() + "." + tbl.getTableName(); - } - alterPartition(fullName, new Partition(tbl, tpart)); } private void fireInsertEvent(Table tbl, Map partitionSpec, List newFiles) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java index c8895c2..c0edde9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java @@ -164,12 +164,11 @@ protected void initialize(Table table, if (table.isPartitioned()) { try { - String partName = Warehouse.makePartName(table.getPartCols(), tPartition.getValues()); if (tPartition.getSd().getLocation() == null) { // set default if location is not set and this is a physical // table partition (not a view partition) if (table.getDataLocation() != null) { - Path partPath = new Path(table.getDataLocation(), partName); + Path partPath = new Path(table.getDataLocation(), Warehouse.makePartName(table.getPartCols(), tPartition.getValues())); tPartition.getSd().setLocation(partPath.toString()); } } -- 1.7.12.4 (Apple Git-37)