diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b19c1aa..e307f06 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -1270,7 +1270,6 @@ public void testEventTypesForDynamicAddPartitionByInsert() throws IOException { String[] ptn_data = new String[]{ "ten"}; run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data[0] + "')", driver); - run("DROP TABLE " + dbName + ".ptned", driver); // Inject a behaviour where it throws exception if an INSERT event is found // As we dynamically add a partition through INSERT INTO cmd, it should just add ADD_PARTITION @@ -1307,7 +1306,7 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event eventTypeValidator.assertInjectionsPerformed(true,false); InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour - verifyIfTableNotExist(replDbName , "ptned", metaStoreClientMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1)", ptn_data, driverMirror); } @Test diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 7f3460f..fc2bf8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -354,7 +354,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return loadTableTask; } - private static Task createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){ + private static Task createTableTask(ImportTableDesc tableDesc, + EximUtil.SemanticAnalyzerWrapperContext x){ return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } @@ -367,26 +368,26 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } - private static Task alterSinglePartition( - URI fromURI, FileSystem fs, ImportTableDesc tblDesc, - Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, - ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn, - EximUtil.SemanticAnalyzerWrapperContext x) { + private static Task alterSinglePartition(ImportTableDesc tblDesc, + Table table, Warehouse wh, + AddPartitionDesc addPartitionDesc, + ReplicationSpec replicationSpec, + org.apache.hadoop.hive.ql.metadata.Partition ptn, + EximUtil.SemanticAnalyzerWrapperContext x) + throws MetaException, IOException, HiveException { addPartitionDesc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){ - addPartitionDesc.setReplicationSpec(replicationSpec); - } - addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location - return TaskFactory.get(new DDLWork( - x.getInputs(), - x.getOutputs(), - addPartitionDesc - ), x.getConf()); + AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); + if (ptn == null) { + fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x); + } else { + partSpec.setLocation(ptn.getLocation()); // use existing location + } + return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); } - private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, - Table table, Warehouse wh, - AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) + private static Task addSinglePartition(ImportTableDesc tblDesc, + Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, + ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { @@ -398,7 +399,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return addPartTask; } else { String srcLocation = partSpec.getLocation(); - fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x); + fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x); x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition " + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); @@ -426,7 +427,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, * Helper method to set location properly in partSpec */ private static void fixLocationInPartSpec( - FileSystem fs, ImportTableDesc tblDesc, Table table, + ImportTableDesc tblDesc, Table table, Warehouse wh, ReplicationSpec replicationSpec, AddPartitionDesc.OnePartitionDesc partSpec, EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, HiveException, IOException { @@ -722,7 +723,7 @@ private static void createRegularImportTasks( org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -753,7 +754,7 @@ private static void createRegularImportTasks( if (isPartitioned(tblDesc)) { for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + addSinglePartition(tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); @@ -855,22 +856,28 @@ private static void createReplImportTasks( lockType = WriteEntity.WriteType.DDL_SHARED; } - Task t = createTableTask(tblDesc, x); + Task t = createTableTask(tblDesc, x); table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); - if (!replicationSpec.isMetadataOnly()) { - if (isPartitioned(tblDesc)) { - for (AddPartitionDesc addPartitionDesc : partitionDescs) { - addPartitionDesc.setReplicationSpec(replicationSpec); - t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); - if (updatedMetadata != null) { - updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); - } + if (isPartitioned(tblDesc)) { + for (AddPartitionDesc addPartitionDesc : partitionDescs) { + addPartitionDesc.setReplicationSpec(replicationSpec); + if (replicationSpec.isMetadataOnly()) { + t.addDependentTask(alterSinglePartition( + tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x)); + } else { + t.addDependentTask(addSinglePartition( + tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } - } else { + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } + } + } else { + if (!replicationSpec.isMetadataOnly()) { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x)); + t.addDependentTask(loadTable(fromURI, table, true, + new Path(tblDesc.getLocation()), replicationSpec, x)); } } // Simply create @@ -882,37 +889,28 @@ private static void createReplImportTasks( for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); - org.apache.hadoop.hive.ql.metadata.Partition ptn = null; - - if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { - if (!replicationSpec.isMetadataOnly()){ + org.apache.hadoop.hive.ql.metadata.Partition ptn + = x.getHive().getPartition(table, partSpec, false); + + if ((ptn == null) || replicationSpec.allowReplacementInto(ptn.getParameters())) { + // If replicating, then the partition already existing means we need to replace, maybe, + // if the destination ptn's repl.last.id is older than the replacement's. + if (replicationSpec.isMetadataOnly()) { + x.getTasks().add(alterSinglePartition( + tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); + } else { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); - if (updatedMetadata != null) { - updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); - } + tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } - } else { - // If replicating, then the partition already existing means we need to replace, maybe, if - // the destination ptn's repl.last.id is older than the replacement's. - if (replicationSpec.allowReplacementInto(ptn.getParameters())){ - if (!replicationSpec.isMetadataOnly()){ - x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); - } else { - x.getTasks().add(alterSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); - } - if (updatedMetadata != null) { - updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); - } - if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ - lockType = WriteEntity.WriteType.DDL_SHARED; - } + if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ + lockType = WriteEntity.WriteType.DDL_SHARED; } } + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } } - if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){ + if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()) { // MD-ONLY table alter x.getTasks().add(alterTableTask(tblDesc, x,replicationSpec)); if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ @@ -921,11 +919,11 @@ private static void createReplImportTasks( } } else { x.getLOG().debug("table non-partitioned"); - if (!replicationSpec.isMetadataOnly()) { + if (replicationSpec.isMetadataOnly()) { + x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); + } else { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x); - } else { - x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED;