diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index acf570f..cfece77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -928,7 +928,7 @@ private int createIndex(Hive db, CreateIndexDesc crtIndex) throws HiveException ); if (HiveUtils.getIndexHandler(conf, crtIndex.getIndexTypeHandlerClass()).usesIndexTable()) { Table indexTable = db.getTable(indexTableName); - work.getOutputs().add(new WriteEntity(indexTable, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(indexTable, WriteEntity.WriteType.DDL_NO_LOCK)); } return 0; } @@ -1024,7 +1024,7 @@ private int alterIndex(Hive db, AlterIndexDesc alterIndex) throws HiveException private int addPartitions(Hive db, AddPartitionDesc addPartitionDesc) throws HiveException { List parts = db.createPartitions(addPartitionDesc); for (Partition part : parts) { - work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.INSERT)); + addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.INSERT)); } return 0; } @@ -1058,7 +1058,7 @@ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) th .getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false); work.getInputs().add(new ReadEntity(oldPart)); // We've already obtained a lock on the table, don't lock the partition too - work.getOutputs().add(new WriteEntity(newPart, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(newPart, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } @@ -1150,7 +1150,7 @@ private int alterTableAlterPart(Hive db, AlterTableAlterPartDesc alterPartitionD work.getInputs().add(new ReadEntity(tbl)); // We've already locked the table as the input, don't relock it as the output. - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } @@ -1176,7 +1176,7 @@ private int touch(Hive db, AlterTableSimpleDesc touchDesc) throw new HiveException("Uable to update table"); } work.getInputs().add(new ReadEntity(tbl)); - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); } else { Partition part = db.getPartition(tbl, touchDesc.getPartSpec(), false); if (part == null) { @@ -1188,7 +1188,7 @@ private int touch(Hive db, AlterTableSimpleDesc touchDesc) throw new HiveException(e); } work.getInputs().add(new ReadEntity(part)); - work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.DDL_NO_LOCK)); } return 0; } @@ -3388,14 +3388,46 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { if (allPartitions != null ) { for (Partition tmpPart: allPartitions) { work.getInputs().add(new ReadEntity(tmpPart)); - work.getOutputs().add(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK)); } } else { work.getInputs().add(new ReadEntity(oldTbl)); - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); } return 0; } + /** + * There are many places where "duplicate" Read/WriteEnity objects are added. The way this was + * initially implemented, the duplicate just replaced the previous object. + * (work.getOutputs() is a Set and WriteEntity#equals() relies on name) + * This may be benign for ReadEntity and perhaps was benign for WriteEntity before WriteType was + * added. Now that WriteEntity has a WriteType it replaces it with one with possibly different + * {@link org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType}. It's hard to imagine + * how this is desirable. + * + * As of HIVE-14993, WriteEntity with different WriteType must be considered different. + * So WriteEntity create in DDLTask cause extra output in golden files, but only because + * DDLTask sets a different WriteType for the same Entity. + * + * In the spirit of bug-for-bug compatibility, this method ensures we only add new + * WriteEntity if it's really new. + * + * @return {@code true} if item was added + */ + static boolean addIfAbsentByName(WriteEntity newWriteEntity, Set outputs) { + for(WriteEntity writeEntity : outputs) { + if(writeEntity.getName().equalsIgnoreCase(newWriteEntity.getName())) { + LOG.debug("Ignoring request to add " + newWriteEntity.toStringDetail() + " because " + + writeEntity.toStringDetail() + " is present"); + return false; + } + } + outputs.add(newWriteEntity); + return true; + } + private boolean addIfAbsentByName(WriteEntity newWriteEntity) { + return addIfAbsentByName(newWriteEntity, work.getOutputs()); + } private boolean isSchemaEvolutionEnabled(Table tbl) { boolean isAcid = AcidUtils.isTablePropertyTransactional(tbl.getMetadata()); @@ -3807,7 +3839,7 @@ private void dropPartitions(Hive db, Table tbl, DropTableDesc dropTbl) throws Hi for (Partition partition : droppedParts) { console.printInfo("Dropped the partition " + partition.getName()); // We have already locked the table, don't lock the partitions. - work.getOutputs().add(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK)); }; } @@ -3900,7 +3932,7 @@ private void dropTable(Hive db, Table tbl, DropTableDesc dropTbl) throws HiveExc db.dropTable(dropTbl.getTableName(), dropTbl.getIfPurge()); if (tbl != null) { // We have already locked the table in DDLSemanticAnalyzer, don't do it again here - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); } } @@ -4067,7 +4099,7 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { ); } } - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } @@ -4215,7 +4247,7 @@ private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws Exceptio // create the table db.createTable(tbl, crtTbl.getIfNotExists()); - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } @@ -4258,10 +4290,10 @@ private int createView(Hive db, CreateViewDesc crtView) throws HiveException { } catch (InvalidOperationException e) { throw new HiveException(e); } - work.getOutputs().add(new WriteEntity(oldview, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(oldview, WriteEntity.WriteType.DDL_NO_LOCK)); } else { // This is a replace, so we need an exclusive lock - work.getOutputs().add(new WriteEntity(oldview, WriteEntity.WriteType.DDL_EXCLUSIVE)); + addIfAbsentByName(new WriteEntity(oldview, WriteEntity.WriteType.DDL_EXCLUSIVE)); } } else { // create new view @@ -4310,7 +4342,7 @@ private int createView(Hive db, CreateViewDesc crtView) throws HiveException { } db.createTable(tbl, crtView.getIfNotExists()); - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); } return 0; } @@ -4385,10 +4417,10 @@ private int exchangeTablePartition(Hive db, // Reuse the partition specs from dest partition since they should be the same work.getInputs().add(new ReadEntity(new Partition(sourceTable, partition.getSpec(), null))); - work.getOutputs().add(new WriteEntity(new Partition(sourceTable, partition.getSpec(), null), + addIfAbsentByName(new WriteEntity(new Partition(sourceTable, partition.getSpec(), null), WriteEntity.WriteType.DELETE)); - work.getOutputs().add(new WriteEntity(new Partition(destTable, partition.getSpec(), null), + addIfAbsentByName(new WriteEntity(new Partition(destTable, partition.getSpec(), null), WriteEntity.WriteType.INSERT)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index ec21cd6..8265af4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -362,9 +362,8 @@ public int execute(DriverContext driverContext) { work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask()); if (work.getOutputs() != null) { - work.getOutputs().add(new WriteEntity(table, - (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT))); + DDLTask.addIfAbsentByName(new WriteEntity(table, + getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); } } else { LOG.info("Partition is: " + tbd.getPartitionSpec().toString()); @@ -467,10 +466,9 @@ public int execute(DriverContext driverContext) { } WriteEntity enty = new WriteEntity(partn, - (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT)); + getWriteType(tbd, work.getLoadTableWork().getWriteType())); if (work.getOutputs() != null) { - work.getOutputs().add(enty); + DDLTask.addIfAbsentByName(enty, work.getOutputs()); } // Need to update the queryPlan's output as well so that post-exec hook get executed. // This is only needed for dynamic partitioning since for SP the the WriteEntity is @@ -515,9 +513,8 @@ public int execute(DriverContext driverContext) { dc = new DataContainer(table.getTTable(), partn.getTPartition()); // add this partition to post-execution hook if (work.getOutputs() != null) { - work.getOutputs().add(new WriteEntity(partn, - (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE - : WriteEntity.WriteType.INSERT))); + DDLTask.addIfAbsentByName(new WriteEntity(partn, + getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); } } } @@ -552,7 +549,24 @@ public int execute(DriverContext driverContext) { return (1); } } - + /** + * so to make sure we crate WriteEntity with the right WriteType. This is (at this point) only + * for consistency since LockManager (which is the only thing that pays attention to WriteType) + * has done it's job before the query ran. + */ + WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation operation) { + if(tbd.getReplace()) { + return WriteEntity.WriteType.INSERT_OVERWRITE; + } + switch (operation) { + case DELETE: + return WriteEntity.WriteType.DELETE; + case UPDATE: + return WriteEntity.WriteType.UPDATE; + default: + return WriteEntity.WriteType.INSERT; + } + } private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) { return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx() .isSkewedStoredAsDir(); diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java index 174b5a8..0842066 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java @@ -79,7 +79,7 @@ * This is derived from t and p, but we need to serialize this field to make * sure Entity.hashCode() does not need to recursively read into t and p. */ - private String name; + private final String name; /** * Whether the output is complete or not. For eg, for dynamic partitions, the @@ -99,10 +99,6 @@ public String getName() { return name; } - public void setName(String name) { - this.name = name; - } - public Database getDatabase() { return database; } @@ -162,6 +158,7 @@ public void setFunctionName(String funcName) { * Only used by serialization. */ public Entity() { + name = null; } /** @@ -326,7 +323,7 @@ public boolean isDummy() { */ @Override public String toString() { - return name; + return getName(); } private String computeName() { @@ -360,7 +357,7 @@ public boolean equals(Object o) { if (o instanceof Entity) { Entity ore = (Entity) o; - return (toString().equalsIgnoreCase(ore.toString())); + return (getName().equalsIgnoreCase(ore.getName())); } else { return false; } @@ -371,7 +368,7 @@ public boolean equals(Object o) { */ @Override public int hashCode() { - return toString().hashCode(); + return getName().hashCode(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java index fccb243..3d7de69 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java @@ -148,7 +148,7 @@ public boolean equals(Object o) { if (o instanceof ReadEntity) { ReadEntity ore = (ReadEntity) o; - return (toString().equalsIgnoreCase(ore.toString())); + return (getName().equalsIgnoreCase(ore.getName())); } else { return false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java index 2194a6d..9e18638 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java @@ -168,12 +168,16 @@ public boolean equals(Object o) { if (o instanceof WriteEntity) { WriteEntity ore = (WriteEntity) o; - return (toString().equalsIgnoreCase(ore.toString())); + return (getName().equalsIgnoreCase(ore.getName())) && this.writeType == ore.writeType; } else { return false; } } + public String toStringDetail() { + return "WriteEntity(" + toString() + ") Type=" + getType() + " WriteType=" + getWriteType(); + } + public boolean isTempURI() { return isTempURI; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 9d58193..9db8a22 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6673,7 +6673,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) new DummyPartition(dest_tab, dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath, partSpec); - output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false); + output = new WriteEntity(p, getWriteType(), false); outputs.add(output); } catch (HiveException e) { throw new SemanticException(e.getMessage(), e); @@ -6746,9 +6746,9 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); - if (!outputs.add(new WriteEntity(dest_part, (ltd.getReplace() ? - WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT)))) { + + if (!outputs.add(new WriteEntity(dest_part, + determineWriteType(ltd, dest_tab.isNonNative())))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_tab.getTableName() + "@" + dest_part.getName())); } @@ -13034,8 +13034,11 @@ private void addAlternateGByKeyMappings(ASTNode gByExpr, ColumnInfo colInfo, // and don't have a rational way to guess, so assume the most // conservative case. if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE; - else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT); + else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType()); + } + private WriteEntity.WriteType getWriteType() { + return updating() ? WriteEntity.WriteType.UPDATE : + (deleting() ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT); } private boolean isAcidOutputFormat(Class of) {