diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 583603ca7b..b27bc43303 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -475,6 +475,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMPDIR_TTL("hive.repl.dumpdir.ttl", "7d", new TimeValidator(TimeUnit.DAYS), "TTL of dump dirs before cleanup."), + REPL_DUMP_COPY_DATA("hive.repl.dump.copydata", true, + "Indicates whether replication dump should contain actual data."), REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, "Indicates whether replication dump only metadata information or data + metadata. \n" + "This config makes hive.repl.include.external.tables config ineffective."), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosWithCopyData.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosWithCopyData.java new file mode 100644 index 0000000000..91ba394cee --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosWithCopyData.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.PersistenceManagerProvider; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.processors.CommandProcessorException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.Utils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.junit.Assert.assertEquals;; + +public class TestReplicationScenariosWithCopyData { + + @Rule + public final TestName testName = new TestName(); + + private final static String DBNOTIF_LISTENER_CLASSNAME = + "org.apache.hive.hcatalog.listener.DbNotificationListener"; + // FIXME : replace with hive copy once that is copied + private final static String tid = + TestReplicationScenariosWithCopyData.class.getCanonicalName().toLowerCase().replace('.','_') + "_" + System.currentTimeMillis(); + private final static String TEST_PATH = + System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid; + + static HiveConf hconf; + static HiveMetaStoreClient metaStoreClient; + private static IDriver driver; + private static String proxySettingName; + private static HiveConf hconfMirror; + private static IDriver driverMirror; + private static HiveMetaStoreClient metaStoreClientMirror; + private static boolean isMigrationTest; + + // Make sure we skip backward-compat checking for those tests that don't generate events + + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenariosWithCopyData.class); + private ArrayList lastResults; + + private final boolean VERIFY_SETUP_STEPS = false; + // if verifySetup is set to true, all the test setup we do will perform additional + // verifications as well, which is useful to verify that our setup occurred + // correctly when developing and debugging tests. These verifications, however + // do not test any new functionality for replication, and thus, are not relevant + // for testing replication itself. For steady state, we want this to be false. + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + HashMap overrideProperties = new HashMap<>(); + overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + internalBeforeClassSetup(overrideProperties, false); + } + + static void internalBeforeClassSetup(Map additionalProperties, boolean forMigration) + throws Exception { + hconf = new HiveConf(TestReplicationScenariosWithCopyData.class); + String metastoreUri = System.getProperty("test."+MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); + if (metastoreUri != null) { + hconf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri); + return; + } + isMigrationTest = forMigration; + + hconf.set(MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getHiveName(), + DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore + hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); + hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/"); + proxySettingName = "hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts"; + hconf.set(proxySettingName, "*"); + hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/"); + hconf.set(MetastoreConf.ConfVars.THRIFT_CONNECTION_RETRIES.getHiveName(), "3"); + hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hconf.set(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + hconf.setBoolVar(HiveConf.ConfVars.REPL_DUMP_COPY_DATA, true); + hconf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, true); + hconf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hconf.set(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname, + "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + hconf.set(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname, + "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore"); + hconf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true); + hconf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true); + hconf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE, true); + System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); + System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + + additionalProperties.forEach((key, value) -> { + hconf.set(key, value); + }); + + MetaStoreTestUtils.startMetaStoreWithRetry(hconf); + // re set the WAREHOUSE property to the test dir, as the previous command added a random port to it + hconf.set(MetastoreConf.ConfVars.WAREHOUSE.getVarname(), System.getProperty("test.warehouse.dir", "/tmp")); + + Path testPath = new Path(TEST_PATH); + FileSystem fs = FileSystem.get(testPath.toUri(),hconf); + fs.mkdirs(testPath); + driver = DriverFactory.newDriver(hconf); + SessionState.start(new CliSessionState(hconf)); + metaStoreClient = new HiveMetaStoreClient(hconf); + + FileUtils.deleteDirectory(new File("metastore_db2")); + HiveConf hconfMirrorServer = new HiveConf(); + hconfMirrorServer.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:;databaseName=metastore_db2;create=true"); + MetaStoreTestUtils.startMetaStoreWithRetry(hconfMirrorServer); + hconfMirror = new HiveConf(hconf); + String thriftUri = MetastoreConf.getVar(hconfMirrorServer, MetastoreConf.ConfVars.THRIFT_URIS); + MetastoreConf.setVar(hconfMirror, MetastoreConf.ConfVars.THRIFT_URIS, thriftUri); + + if (forMigration) { + hconfMirror.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES, true); + hconfMirror.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + hconfMirror.set(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname, + "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + } + driverMirror = DriverFactory.newDriver(hconfMirror); + metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror); + + PersistenceManagerProvider.setTwoMetastoreTesting(true); + } + + @AfterClass + public static void tearDownAfterClass(){ + } + + @Before + public void setUp(){ + // before each test + SessionState.get().setCurrentDatabase("default"); + } + + @After + public void tearDown(){ + // after each test + } + + private static int next = 0; + private synchronized void advanceDumpDir() { + next++; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + } + + static class Tuple { + final String dumpLocation; + final String lastReplId; + + Tuple(String dumpLocation, String lastReplId) { + this.dumpLocation = dumpLocation; + this.lastReplId = lastReplId; + } + } + + private Tuple bootstrapLoadAndVerify(String dbName, String replDbName) throws IOException { + return incrementalLoadAndVerify(dbName, null, replDbName); + } + + private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String replDbName) throws IOException { + Tuple dump = replDumpDb(dbName, fromReplId, null, null); + loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); + return dump; + } + + private Tuple replDumpDb(String dbName, String fromReplID, String toReplID, String limit) throws IOException { + advanceDumpDir(); + String dumpCmd = "REPL DUMP " + dbName; + if (null != fromReplID) { + dumpCmd = dumpCmd + " FROM " + fromReplID; + } + if (null != toReplID) { + dumpCmd = dumpCmd + " TO " + toReplID; + } + if (null != limit) { + dumpCmd = dumpCmd + " LIMIT " + limit; + } + run(dumpCmd, driver); + String dumpLocation = getResult(0, 0, driver); + String lastReplId = getResult(0, 1, true, driver); + LOG.info("Dumped to {} with id {} for command: {}", dumpLocation, lastReplId, dumpCmd); + return new Tuple(dumpLocation, lastReplId); + } + + private void loadAndVerify(String replDbName, String dumpLocation, String lastReplId) throws IOException { + run("REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'", driverMirror); + verifyRun("REPL STATUS " + replDbName, lastReplId, driverMirror); + return; + } + + private abstract class checkTaskPresent { + public boolean hasTask(Task rootTask) { + if (rootTask == null) { + return false; + } + if (validate(rootTask)) { + return true; + } + List> childTasks = rootTask.getChildTasks(); + if (childTasks == null) { + return false; + } + for (Task childTask : childTasks) { + if (hasTask(childTask)) { + return true; + } + } + return false; + } + + public abstract boolean validate(Task task); + } + + @Test + public void testIncrementalLoadWithDumpData() throws IOException { + String testName = "incrementalLoad"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + + String[] unptn_data = new String[] { "eleven", "twelve" }; + String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "sixteen", "seventeen" }; + String[] empty = new String[] {}; + + String unptn_locn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data, driver); + + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptn_data, driverMirror); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver); + + run("CREATE TABLE " + dbName + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + + ".ptned WHERE b=1", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptn_data_1, driver); + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); + + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); + } + + private static String createDB(String name, IDriver myDriver) { + LOG.info("Testing " + name); + run("CREATE DATABASE " + name + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')", myDriver); + return name; + } + + private String getResult(int rowNum, int colNum, IDriver myDriver) throws IOException { + return getResult(rowNum,colNum,false, myDriver); + } + private String getResult(int rowNum, int colNum, boolean reuse, IDriver myDriver) throws IOException { + if (!reuse) { + lastResults = new ArrayList(); + myDriver.getResults(lastResults); + } + // Split around the 'tab' character + return (lastResults.get(rowNum).split("\\t"))[colNum]; + } + + /** + * All the results that are read from the hive output will not preserve + * case sensitivity and will all be in lower case, hence we will check against + * only lower case data values. + * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case + * before assert. + */ + private void verifyResults(String[] data, IDriver myDriver) throws IOException { + List results = getOutput(myDriver); + LOG.info("Expecting {}", data); + LOG.info("Got {}", results); + assertEquals(data.length, results.size()); + for (int i = 0; i < data.length; i++) { + assertEquals(data[i].toLowerCase().trim(), results.get(i).toLowerCase().trim()); + } + } + + private List getOutput(IDriver myDriver) throws IOException { + List results = new ArrayList<>(); + myDriver.getResults(results); + return results; + } + + private void verifySetup(String cmd, String[] data, IDriver myDriver) throws IOException { + if (VERIFY_SETUP_STEPS){ + run(cmd, myDriver); + verifyResults(data, myDriver); + } + } + + private void verifyRun(String cmd, String data, IDriver myDriver) throws IOException { + verifyRun(cmd, new String[] { data }, myDriver); + } + + private void verifyRun(String cmd, String[] data, IDriver myDriver) throws IOException { + run(cmd, myDriver); + verifyResults(data, myDriver); + } + + private static void run(String cmd, IDriver myDriver) throws RuntimeException { + try { + run(cmd,false, myDriver); // default arg-less run simply runs, and does not care about failure + } catch (AssertionError ae){ + // Hive code has AssertionErrors in some cases - we want to record what happens + LOG.warn("AssertionError:",ae); + throw new RuntimeException(ae); + } + } + + private static boolean run(String cmd, boolean errorOnFail, IDriver myDriver) throws RuntimeException { + boolean success = false; + try { + myDriver.run(cmd); + success = true; + } catch (CommandProcessorException e) { + LOG.warn("Error {} : {} running [{}].", e.getErrorCode(), e.getMessage(), cmd); + } + return success; + } + + private static void createTestDataFile(String filename, String[] lines) throws IOException { + FileWriter writer = null; + try { + File file = new File(filename); + file.deleteOnExit(); + writer = new FileWriter(file); + for (String line : lines) { + writer.write(line + "\n"); + } + } finally { + if (writer != null) { + writer.close(); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 470357af49..3ec499ff74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -255,7 +255,7 @@ public int execute() { return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles); + new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles, fromPath); // If a file is copied from CM path, then need to rename them using original source file name // This is needed to avoid having duplicate files in target if same event is applied twice @@ -324,10 +324,18 @@ public String getName() { public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, boolean copyToMigratedTxnTable) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, + copyToMigratedTxnTable, true); + } + + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean isAutoPurge, boolean needRecycle, + boolean copyToMigratedTxnTable, boolean readSrcAsFilesList) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + rcwork.setReadSrcAsFilesList(readSrcAsFilesList); if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) { rcwork.setDeleteDestIfExist(true); rcwork.setAutoPurge(isAutoPurge); @@ -341,7 +349,6 @@ public String getName() { LOG.debug("ReplCopyTask:\trcwork"); if (replicationSpec.isLazy()) { LOG.debug("ReplCopyTask:\tlazy"); - rcwork.setReadSrcAsFilesList(true); // It is assumed isLazy flag is set only for REPL LOAD flow. // IMPORT always do deep copy. So, distCpDoAsUser will be null by default in ReplCopyWork. @@ -360,4 +367,9 @@ public String getName() { HiveConf conf) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false); } + + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean readSourceAsFileList) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false, readSourceAsFileList); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 977abb74cc..aa3184c9c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -72,6 +73,7 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -119,18 +121,19 @@ public String getName() { public int execute() { try { Hive hiveDb = getHive(); - Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir()); - DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); + Path dumpBaseDir = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir()); + Path hiveDumpRoot = new Path(dumpBaseDir, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; if (work.isBootStrapDump()) { - lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot, hiveDb); + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } else { - lastReplId = incrementalDump(dumpRoot, dmd, cmRoot, hiveDb); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } - prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId))); + prepareReturnValues(Arrays.asList(dumpBaseDir.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -305,7 +308,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, cmRoot, hiveDb); + dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); } replLogger.endLog(lastReplId.toString()); @@ -344,8 +347,11 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, + dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); + Path tableRoot = new Path(dbRoot, tableName); + Task copyTask = ReplCopyTask.getLoadCopyTask(new ReplicationSpec(), tableRoot, new Path(dumpRoot, ".pks/"), conf); + this.addDependentTask(copyTask); } if (tableList != null && isTableSatifiesConfig(table)) { tableList.add(tableName); @@ -385,9 +391,10 @@ private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncremen return new Path(dumpRoot, dbName); } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, + dumpRoot, cmRoot, db, conf, @@ -505,7 +512,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.debug("Adding table {} to external tables list", tblName); writer.dataLocationDump(tableTuple.object); } - dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, + dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. @@ -563,7 +570,7 @@ Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) return dbRoot; } - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); @@ -582,10 +589,20 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); - new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); - + Path replDataDir = new Path(dumproot, EximUtil.DATA_PATH_NAME); + new TableExport(exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + (TableType.EXTERNAL_TABLE.equals(tuple.object.getTableType()) + && (!conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)))) { + return; + } replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); + Path dumpDataDir = new Path(dumproot, EximUtil.DATA_PATH_NAME); + Path tblCopyPath = new Path(dumpDataDir, dbName); + tblCopyPath = new Path(tblCopyPath, tblName); + Task copyTask = ReplCopyTask.getLoadCopyTask( + tuple.replicationSpec, tuple.object.getPath(), tblCopyPath, conf, false); + this.addDependentTask(copyTask); } private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a2c467bafd..26d74129ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -278,7 +278,7 @@ a database ( directory ) } this.childTasks = scope.rootTasks; /* - Since there can be multiple rounds of this run all of which will be tied to the same + Since there can be multiple rounds rcof this run all of which will be tied to the same query id -- generated in compile phase , adding a additional UUID to the end to print each run in separate files. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 11597740e2..cf0876cbd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -244,7 +244,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc event.replicationSpec(), sourceWarehousePartitionLocation, stagingDir, - context.hiveConf + context.hiveConf, false ); Task movePartitionTask = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 65588fdbe9..0b540c8980 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -296,7 +296,14 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent + table.getCompleteName() + " with source location: " + dataPath.toString() + " and target location " + tgtPath.toString()); - Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); + String dumpDir = context.dumpDirectory; + Path newDataPath = new Path(dumpDir, EximUtil.DATA_PATH_NAME + Path.SEPARATOR + + fromURI.toString().substring(dumpDir.length())); + + String subDirectory = newDataPath.getName(); + tmpPath = new Path(tmpPath, subDirectory); + + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, newDataPath, tmpPath, context.hiveConf, false); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java index f2c8e8fd54..76c3a8a272 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index fc7f226d77..1040f0115a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; @@ -79,6 +80,9 @@ // Root directory for dumping bootstrapped tables along with incremental events dump. public static final String INC_BOOTSTRAP_ROOT_DIR_NAME = "_bootstrap"; + // Root base directory name for hive. + public static final String REPL_HIVE_BASE_DIR = "hive"; + // Name of the directory which stores the list of tables included in the policy in case of table level replication. // One file per database, named after the db name. The directory is not created for db level replication. public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; @@ -236,7 +240,8 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { return p -> { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) - && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) + && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); } @@ -246,7 +251,8 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) { return p -> { try { - return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) + && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 810a4c5284..1fb2d5be94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -385,6 +386,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // Make fully qualified path for further use. loadPath = fs.makeQualified(loadPath); + // Load path points to hive load path by default. + loadPath = new Path(loadPath, ReplUtils.REPL_HIVE_BASE_DIR); + if (!fs.exists(loadPath)) { // supposed dump path does not exist. LOG.error("File not found " + loadPath.toUri().toString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 73c863ed1a..84a692aa06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -72,28 +72,34 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinatio // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm public void copyAndVerify(Path destRoot, - List srcFiles) throws IOException, LoginException, HiveFatalException { - Map>> map = fsToFileMap(srcFiles, destRoot); + List srcFiles, Path origSrcPtah) throws IOException, LoginException, HiveFatalException { UserGroupInformation proxyUser = getProxyUser(); + FileSystem sourceFs = origSrcPtah.getFileSystem(hiveConf); + boolean useRegularCopy = regularCopy(sourceFs, srcFiles); try { - for (Map.Entry>> entry : map.entrySet()) { - Map> destMap = entry.getValue(); - for (Map.Entry> destMapEntry : destMap.entrySet()) { - Path destination = destMapEntry.getKey(); - List fileInfoList = destMapEntry.getValue(); - // Get the file system again from cache. There is a chance that the file system stored in the map is closed. - // For instance, doCopyRetry closes the file system in case of i/o exceptions. - FileSystem sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); - boolean useRegularCopy = regularCopy(sourceFs, fileInfoList); - - if (!destinationFs.exists(destination) - && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { - LOG.error("Failed to create destination directory: " + destination); - throw new IOException("Destination directory creation failed"); + if (!useRegularCopy) { + srcFiles.clear(); + srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPtah, null)); + doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy); + } else { + Map>> map = fsToFileMap(srcFiles, destRoot); + for (Map.Entry>> entry : map.entrySet()) { + Map> destMap = entry.getValue(); + for (Map.Entry> destMapEntry : destMap.entrySet()) { + Path destination = destMapEntry.getKey(); + List fileInfoList = destMapEntry.getValue(); + // Get the file system again from cache. There is a chance that the file system stored in the map is closed. + // For instance, doCopyRetry closes the file system in case of i/o exceptions. + sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); + if (!destinationFs.exists(destination) + && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { + LOG.error("Failed to create destination directory: " + destination); + throw new IOException("Destination directory creation failed"); + } + + // Copy files with retry logic on failure or source file is dropped or changed. + doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true); } - - // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy); } } } finally { @@ -181,12 +187,12 @@ private void doCopyRetry(FileSystem sourceFs, List s continue; } Path srcPath = srcFile.getEffectivePath(); - Path destPath = new Path(destination, srcPath.getName()); - if (destinationFs.exists(destPath)) { + //Path destPath = new Path(destination, srcPath.getName()); + if (destinationFs.exists(destination)) { // If destination file is present and checksum of source mismatch, then retry copy. if (isSourceFileMismatch(sourceFs, srcFile)) { // Delete the incorrectly copied file and retry with CM path - destinationFs.delete(destPath, true); + destinationFs.delete(destination, true); srcFile.setIsUseSourcePath(false); } else { // If the retry logic is reached after copy error, then include the copied file as well. @@ -293,6 +299,9 @@ private void doCopyOnce(FileSystem sourceFs, List srcList, Path destination, boolean useRegularCopy, UserGroupInformation proxyUser) throws IOException { if (useRegularCopy) { +// if (srcList.size() == 1 && srcList.get(0).getName().equals(destination.getName())) { +// destination = destination.getParent(); +// } doRegularCopyOnce(sourceFs, srcList, destination, proxyUser); } else { doDistCpCopyOnce(sourceFs, srcList, destination, proxyUser); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 9e24799382..f2c9156910 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -115,8 +115,9 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) - .export(forReplicationSpec); + if (!forReplicationSpec.isLazy()) { + new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx).export(); + } LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 97a1dd31a7..af9f9c9c7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -160,8 +160,10 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) - .export(replicationSpec); + // this is the data copy + if (!replicationSpec.isLazy()) { + new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx).export(); + } } } catch (Exception e) { throw new SemanticException(e.getMessage(), e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index b9967031cd..e172c783d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -17,15 +17,28 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.login.LoginException; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + abstract class AbstractEventHandler implements EventHandler { static final Logger LOG = LoggerFactory.getLogger(AbstractEventHandler.class); static final MessageEncoder jsonMessageEncoder = JSONMessageEncoder.getInstance(); @@ -71,4 +84,31 @@ public long fromEventId() { public long toEventId() { return event.getEventId(); } + + public void writeFileEntry(String dbName, String tblName, String file, BufferedWriter fileListWriter, + Context withinContext) throws IOException, LoginException { + HiveConf hiveConf = withinContext.hiveConf; + boolean copyActualData = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_COPY_DATA); + String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + if (copyActualData) { + Path dataPath = new Path(withinContext.dumpRoot.toString(), EximUtil.DATA_PATH_NAME); + List dataPathList = new LinkedList<>(); + String[] decodedURISplits = ReplChangeManager.decodeFileUri(file); + String srcDataFile = decodedURISplits[0]; + Path srcDataPath = new Path(srcDataFile); + dataPathList.add(srcDataPath); + FileOperations fileOperations = new FileOperations(dataPathList, dataPath, distCpDoAsUser, hiveConf, + null); + String relativePath = event.getEventId() + File.separator + dbName + File.separator + tblName; + Path targetPath = new Path(dataPath, relativePath); + targetPath = fileOperations.getPathWithSchemeAndAuthority(targetPath, srcDataPath); + targetPath = new Path(targetPath, srcDataFile.substring(srcDataFile.indexOf(decodedURISplits[3]))); + fileOperations.copyOneDataPath(srcDataPath, targetPath); + String encodedTargetPath = ReplChangeManager.encodeFileUri( + targetPath.toString(), decodedURISplits[1], decodedURISplits[3]); + fileListWriter.write(encodedTargetPath + "\n"); + } else { + fileListWriter.write(file.toString() + "\n"); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 42e74b37d9..0562b1e0bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -110,8 +110,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { for (String file : files) { - fileListWriter.write(file); - fileListWriter.newLine(); + super.writeFileEntry(qlMdTable.getDbName(), qlMdTable.getTableName(), file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 7d7dc26a25..d59a996c93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -32,11 +32,14 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.fs.FileSystem; + +import javax.security.auth.login.LoginException; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; @@ -60,17 +63,19 @@ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOExc return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); } - private void writeDumpFiles(Context withinContext, Iterable files, Path dataPath) throws IOException { + private void writeDumpFiles(Table qlMdTable, Context withinContext, Iterable files, Path dataPath) + throws IOException, LoginException { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(qlMdTable.getDbName(), qlMdTable.getTableName(), file, fileListWriter, withinContext); } } } private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, - List qlPtns, List> fileListArray) throws IOException, SemanticException { + List qlPtns, List> fileListArray) + throws IOException, SemanticException, LoginException { if (fileListArray == null || fileListArray.isEmpty()) { return; } @@ -86,17 +91,18 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met if ((null == qlPtns) || qlPtns.isEmpty()) { Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); - writeDumpFiles(withinContext, fileListArray.get(0), dataPath); + writeDumpFiles(qlMdTable, withinContext, fileListArray.get(0), dataPath); } else { for (int idx = 0; idx < qlPtns.size(); idx++) { Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName()); - writeDumpFiles(withinContext, fileListArray.get(idx), dataPath); + writeDumpFiles(qlMdTable, withinContext, fileListArray.get(idx), dataPath); } } } private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, - List qlPtns, List> fileListArray) throws IOException, SemanticException { + List qlPtns, List> fileListArray) + throws IOException, SemanticException, LoginException { Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName()); Context context = new Context(withinContext); context.setEventRoot(newPath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 355374aeb8..7467d323f0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -84,7 +84,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(qlMdTable.getDbName(), qlMdTable.getTableName(), file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index 7d00f89a5b..74a16b41aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -36,6 +36,7 @@ DumpType dumpType(); class Context { + Path dumpRoot; Path eventRoot; final Path cmRoot; final Hive db; @@ -45,8 +46,9 @@ final ReplScope oldReplScope; private Set tablesForBootstrap; - public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, + public Context(Path eventRoot, Path dumpRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope, Set tablesForBootstrap) { + this.dumpRoot = dumpRoot; this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; @@ -58,6 +60,7 @@ public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, Replicat } public Context(Context other) { + this.dumpRoot = other.dumpRoot; this.eventRoot = other.eventRoot; this.cmRoot = other.cmRoot; this.db = other.db; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 5a18d573cf..c262850d29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -97,7 +97,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(qlMdTable.getDbName(), qlMdTable.getTableName(), file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index fc5419ce3f..43032dea4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io; import java.io.BufferedWriter; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStreamWriter; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -29,25 +29,19 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; -import org.apache.hadoop.hive.shims.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.hive.ql.ErrorMsg.FILE_NOT_FOUND; //TODO: this object is created once to call one method and then immediately destroyed. //So it's basically just a roundabout way to pass arguments to a static method. Simplify? @@ -75,12 +69,8 @@ public FileOperations(List dataPathList, Path exportRootDataDir, String di exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } - public void export(ReplicationSpec forReplicationSpec) throws Exception { - if (forReplicationSpec.isLazy()) { - exportFilesAsList(); - } else { - copyFiles(); - } + public void export() throws Exception { + copyFiles(); } /** @@ -96,7 +86,7 @@ private void copyFiles() throws IOException, LoginException { } } - private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { + public void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath); List srcPaths = new ArrayList<>(); for (FileStatus fileStatus : fileStatuses) { @@ -141,68 +131,12 @@ private void copyMmPath() throws LoginException, IOException { } } - /** - * This needs the root data directory to which the data needs to be exported to. - * The data export here is a list of files either in table/partition that are written to the _files - * in the exportRootDataDir provided. - */ - private void exportFilesAsList() throws SemanticException, IOException, LoginException { - if (dataPathList.isEmpty()) { - return; - } - boolean done = false; - int repeat = 0; - while (!done) { - // This is only called for replication that handles MM tables; no need for mmCtx. - try (BufferedWriter writer = writer()) { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); - } - done = true; - } catch (IOException e) { - if (e instanceof FileNotFoundException) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); - throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); - } - repeat++; - logger.info("writeFilesList failed", e); - if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); - throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); - } - - int sleepTime = FileUtils.getSleepTime(repeat - 1); - logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException timerEx) { - logger.info("thread sleep interrupted", timerEx.getMessage()); - } - - // in case of io error, reset the file system object - FileSystem.closeAllForUGI(Utils.getUGI()); - dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); - exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); - Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME); - if (exportFileSystem.exists(exportPath)) { - exportFileSystem.delete(exportPath, true); - } - } - } - } - - private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs) - throws IOException { - for (FileStatus fileStatus : fileStatuses) { - if (fileStatus.isDirectory()) { - // Write files inside the sub-directory. - Path subDir = fileStatus.getPath(); - writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir)); - } else { - writer.write(encodedUri(fileStatus, encodedSubDirs)); - writer.newLine(); - } + public Path getPathWithSchemeAndAuthority(Path targetFilePath, Path currentFilePath) { + if (targetFilePath.toUri().getScheme() == null) { + URI currentURI = currentFilePath.toUri(); + targetFilePath = new Path(currentURI.getScheme(), currentURI.getAuthority(), targetFilePath.toUri().getPath()); } + return targetFilePath; } private FileStatus[] listFilesInDir(Path path) throws IOException { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index aacd29591d..67255f2297 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -127,8 +127,8 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, - long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path replDataDir, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { tableDumpCount++; if (tableDumpCount > 1) {