diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 82ecad1065..79ee80aed0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,10 +217,10 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) { return rspec; } - private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { + Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { // bootstrap case Hive hiveDb = getHive(); - Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + Long bootDumpBeginReplId = currentNotificationId(hiveDb); String validTxnList = getValidTxnListForReplDump(hiveDb); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); @@ -231,16 +232,35 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws dumpFunctionMetadata(dbName, dumpRoot); String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); - for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { - LOG.debug( - "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); - dumpTable(dbName, tblName, validTxnList, dbRoot); - dumpConstraintMetadata(dbName, tblName, dbRoot); + Exception caught = null; + try { + for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { + LOG.debug( + "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); + dumpTable(dbName, tblName, validTxnList, dbRoot); + dumpConstraintMetadata(dbName, tblName, dbRoot); + } + } catch (Exception e) { + caught = e; + } finally { + try { + Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); + } catch (Exception e) { + if (caught == null) { + throw e; + } else { + LOG.error("failed to reset the db state for " + uniqueKey + + " on failure of repl dump", e); + throw caught; + } + } + if(caught != null) { + throw caught; + } } - Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); replLogger.endLog(bootDumpBeginReplId.toString()); } - Long bootDumpEndReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + Long bootDumpEndReplId = currentNotificationId(hiveDb); LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId); @@ -274,7 +294,11 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws return bootDumpBeginReplId; } - private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception { + long currentNotificationId(Hive hiveDb) throws TException { + return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + } + + Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception { Path dbRoot = new Path(dumpRoot, dbName); // TODO : instantiating FS objects are generally costly. Refactor FileSystem fs = dbRoot.getFileSystem(conf); @@ -284,7 +308,7 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throw return dbRoot; } - private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception { + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception { try { Hive db = getHive(); HiveWrapper.Tuple tuple = new HiveWrapper(db, dbName).table(tblName); @@ -331,7 +355,7 @@ private String getValidWriteIdList(String dbName, String tblName, String validTx return openTxns; } - private String getValidTxnListForReplDump(Hive hiveDb) throws HiveException { + String getValidTxnListForReplDump(Hive hiveDb) throws HiveException { // Key design point for REPL DUMP is to not have any txns older than current txn in which dump runs. // This is needed to ensure that Repl dump doesn't copy any data files written by any open txns // mainly for streaming ingest case where one delta file shall have data from committed/aborted/open txns. @@ -396,7 +420,7 @@ private String getNextDumpDir() { } } - private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception { + void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception { Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); List functionNames = getHive().getFunctions(dbName, "*"); for (String functionName : functionNames) { @@ -415,7 +439,7 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception } } - private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception { + void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception { try { Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME); Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 62d699f706..59ffb90328 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -75,7 +75,7 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi } } - public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { + public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { if (dbPattern == null) { return db.getAllDatabases(); } else { @@ -83,7 +83,7 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi } } - public static Iterable matchesTbl(Hive db, String dbName, String tblPattern) + public static Iterable matchesTbl(Hive db, String dbName, String tblPattern) throws HiveException { if (tblPattern == null) { return getAllTables(db, dbName); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java new file mode 100644 index 0000000000..7bd035e076 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java @@ -0,0 +1,126 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.verifyStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ Utils.class }) +@PowerMockIgnore({ "javax.management.*" }) +public class ReplDumpTaskTest { + + @Mock + private Hive hive; + + class StubReplDumpTask extends ReplDumpTask { + + @Override + protected Hive getHive() { + return hive; + } + + @Override + long currentNotificationId(Hive hiveDb) { + return Long.MAX_VALUE; + } + + @Override + String getValidTxnListForReplDump(Hive hiveDb) { + return ""; + } + + @Override + void dumpFunctionMetadata(String dbName, Path dumpRoot) { + } + + @Override + Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) { + return Mockito.mock(Path.class); + } + + @Override + void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) { + } + } + + private static class TestException extends Exception { + } + + @Test(expected = TestException.class) + public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throws Exception { + List tableList = Arrays.asList("a1", "a2"); + String dbRandomKey = "akeytoberandom"; + + mockStatic(Utils.class); + when(Utils.matchesDb(same(hive), eq("default"))) + .thenReturn(Collections.singletonList("default")); + when(Utils.getAllTables(same(hive), eq("default"))).thenReturn(tableList); + when(Utils.setDbBootstrapDumpState(same(hive), eq("default"))).thenReturn(dbRandomKey); + when(Utils.matchesTbl(same(hive), eq("default"), anyString())).thenReturn(tableList); + + + when(hive.getAllFunctions()).thenReturn(Collections.emptyList()); + + ReplDumpTask task = new StubReplDumpTask() { + private int tableDumpCount = 0; + + @Override + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) + throws Exception { + tableDumpCount++; + if (tableDumpCount > 1) { + throw new TestException(); + } + } + }; + + task.setWork( + new ReplDumpWork("default", "", + Long.MAX_VALUE, Long.MAX_VALUE, "", + Integer.MAX_VALUE, "") + ); + + try { + task.bootStrapDump(mock(Path.class), null, mock(Path.class)); + } finally { + verifyStatic(); + Utils.resetDbBootstrapDumpState(same(hive), eq("default"), eq(dbRandomKey)); + } + } +}