diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index c25e6e2159..09d7e0f5ef 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -220,9 +220,10 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. externalTableBasePathWithClause(); - List loadWithClause = Collections.singletonList( + List loadWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + REPLICA_EXTERNAL_BASE + "'" + + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.update'=''" ); WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName) @@ -359,19 +360,62 @@ public void externalTableIncrementalReplication() throws Throwable { WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName); replica.load(replicatedDbName, tuple.dumpLocation); + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t1/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + tuple = primary.run("use " + primaryDbName) - .run("create external table t1 (place string) partitioned by (country string)") + .run("create external table t1 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") .run("alter table t1 add partition(country='india')") .run("alter table t1 add partition(country='us')") .dump(primaryDbName, tuple.lastReplicationId); + assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME)); + + // Add new data externally, to a partition, but under the partition level top directory + // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("bangalore\n".getBytes()); + } + List loadWithClause = externalTableBasePathWithClause(); replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") .run("show partitions t1") - .verifyResults(new String[] { "country=india", "country=us" }); + .verifyResults(new String[] { "country=india", "country=us" }) + .run("select place from t1 order by place") + .verifyResults(new String[] { "bangalore", "mumbai", "pune" }); + + // Delete one of the file and update another one. + fs.delete(new Path(partitionDir, "file.txt"), true); + fs.delete(new Path(partitionDir, "file1.txt"), true); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("chennai\n".getBytes()); + } + + // Repl load with zero events but external tables location info should present. + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] { "country=india", "country=us" }) + .run("select place from t1 order by place") + .verifyResults(new String[] { "chennai" }); Hive hive = Hive.get(replica.getConf()); Set partitions = @@ -391,7 +435,6 @@ public void externalTableIncrementalReplication() throws Throwable { for (String path : paths) { assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path))); } - } @Test @@ -492,9 +535,10 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { fileSystem.mkdirs(externalTableLocation); // this is required since the same filesystem is used in both source and target - return Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + externalTableLocation.toString() + "'" + return Arrays.asList( + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + externalTableLocation.toString() + "'", + "'distcp.options.pugpb'=''" ); } diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index b6f70ebe63..e774419eb8 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -1110,20 +1110,30 @@ public void setStoragePolicy(Path path, StoragePolicyValue policy) private static final String DISTCP_OPTIONS_PREFIX = "distcp.options."; List constructDistCpParams(List srcPaths, Path dst, Configuration conf) { + // -update and -delete are mandatory options for directory copy to work. + // -pbx is default preserve options if user doesn't pass any. List params = new ArrayList(); + boolean needToAddPreserveOption = true; for (Map.Entry entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){ String distCpOption = entry.getKey(); String distCpVal = entry.getValue(); + if (distCpOption.startsWith("p")) { + needToAddPreserveOption = false; + } params.add("-" + distCpOption); if ((distCpVal != null) && (!distCpVal.isEmpty())){ params.add(distCpVal); } } - if (params.size() == 0){ - // if no entries were added via conf, we initiate our defaults - params.add("-update"); + if (needToAddPreserveOption) { params.add("-pbx"); } + if (!params.contains("-update")) { + params.add("-update"); + } + if (!params.contains("-delete")) { + params.add("-delete"); + } for (Path src : srcPaths) { params.add(src.toString()); } @@ -1150,10 +1160,11 @@ public Boolean run() throws Exception { @Override public boolean runDistCp(List srcPaths, Path dst, Configuration conf) throws IOException { DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst) - .withSyncFolder(true) - .withCRC(true) - .preserve(FileAttribute.BLOCKSIZE) - .build(); + .withSyncFolder(true) + .withDeleteMissing(true) + .preserve(FileAttribute.BLOCKSIZE) + .preserve(FileAttribute.XATTR) + .build(); // Creates the command-line parameters for distcp List params = constructDistCpParams(srcPaths, dst, conf); diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java index 9a9311bdeb..efb0074271 100644 --- a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java +++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java @@ -40,40 +40,41 @@ public void testConstructDistCpParams() { Hadoop23Shims shims = new Hadoop23Shims(); List paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); - assertEquals(4, paramsDefault.size()); - assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); + assertEquals(5, paramsDefault.size()); assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx")); - assertEquals(copySrc.toString(), paramsDefault.get(2)); - assertEquals(copyDst.toString(), paramsDefault.get(3)); + assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); + assertTrue("Distcp -delete set by default", paramsDefault.contains("-delete")); + assertEquals(copySrc.toString(), paramsDefault.get(3)); + assertEquals(copyDst.toString(), paramsDefault.get(4)); conf.set("distcp.options.foo", "bar"); // should set "-foo bar" conf.set("distcp.options.blah", ""); // should set "-blah" + conf.set("distcp.options.pug", ""); // should set "-pug" conf.set("dummy", "option"); // should be ignored. List paramsWithCustomParamInjection = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); - assertEquals(5, paramsWithCustomParamInjection.size()); - - // check that the defaults did not remain. - assertTrue("Distcp -update not set if not requested", - !paramsWithCustomParamInjection.contains("-update")); + assertEquals(8, paramsWithCustomParamInjection.size()); + + // check that the mandatory ones remain along with user passed ones. + assertTrue("Distcp -update set even if not requested", + paramsWithCustomParamInjection.contains("-update")); + assertTrue("Distcp -delete set even if not requested", + paramsWithCustomParamInjection.contains("-delete")); + assertTrue("Distcp -foo is set as passes", + paramsWithCustomParamInjection.contains("-foo")); + assertTrue("Distcp -blah is set as passes", + paramsWithCustomParamInjection.contains("-blah")); + assertTrue("Distcp -pug is set as passes", + paramsWithCustomParamInjection.contains("-pug")); + assertTrue("Distcp -pbx not set as overridden", + !paramsWithCustomParamInjection.contains("-pbx")); assertTrue("Distcp -skipcrccheck not set if not requested", !paramsWithCustomParamInjection.contains("-skipcrccheck")); - assertTrue("Distcp -pbx not set if not requested", - !paramsWithCustomParamInjection.contains("-pbx")); - - // the "-foo bar" and "-blah" params order is not guaranteed - String firstParam = paramsWithCustomParamInjection.get(0); - if (firstParam.equals("-foo")){ - // "-foo bar -blah" form - assertEquals("bar", paramsWithCustomParamInjection.get(1)); - assertEquals("-blah", paramsWithCustomParamInjection.get(2)); - } else { - // "-blah -foo bar" form - assertEquals("-blah", paramsWithCustomParamInjection.get(0)); - assertEquals("-foo", paramsWithCustomParamInjection.get(1)); - assertEquals("bar", paramsWithCustomParamInjection.get(2)); - } + + // the "-foo bar" order is guaranteed + int idx = paramsWithCustomParamInjection.indexOf("-foo"); + assertEquals("bar", paramsWithCustomParamInjection.get(idx+1)); // the dummy option should not have made it either - only options // beginning with distcp.options. should be honoured @@ -82,8 +83,8 @@ public void testConstructDistCpParams() { assertTrue(!paramsWithCustomParamInjection.contains("option")); assertTrue(!paramsWithCustomParamInjection.contains("-option")); - assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(3)); - assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(4)); + assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(6)); + assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(7)); }