From 4798cbcf3acf4cd86242d2948d71c53c5b9fd8b2 Mon Sep 17 00:00:00 2001 From: Pratyush Madhukar Date: Tue, 12 Jan 2021 15:24:29 +0530 Subject: [PATCH] HIVE-24624: Repl Load should detect the compatible staging dir --- .../apache/hadoop/hive/ql/exec/Utilities.java | 1 + .../hive/ql/exec/repl/ReplDumpTask.java | 2 + .../ql/parse/ReplicationSemanticAnalyzer.java | 9 ++ .../hive/ql/parse/repl/load/DumpMetaData.java | 23 +++++- .../ql/parse/repl/load/TestDumpMetaData.java | 82 +++++++++++++++++++ 5 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestDumpMetaData.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index e77b2956c9..1d6e409bf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -253,6 +253,7 @@ public static final String MAPNAME = "Map "; public static final String REDUCENAME = "Reducer "; public static final String ENSURE_OPERATORS_EXECUTED = "ENSURE_OPERATORS_EXECUTED"; + public static final int MIN_VERSION_FOR_NEW_DUMP_FORMAT = 3; @Deprecated protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 6c64b5a6d1..1659fa7f28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -111,6 +111,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT; +import static org.apache.hadoop.hive.ql.exec.Utilities.MIN_VERSION_FOR_NEW_DUMP_FORMAT; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER; @@ -180,6 +181,7 @@ public int execute() { initiateAuthorizationDumpTask(); } DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); + dmd.setHiveVersion(MIN_VERSION_FOR_NEW_DUMP_FORMAT); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 60ada73ae9..91a7c26efc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; @@ -326,6 +327,14 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if (loadPath != null) { DumpMetaData dmd = new DumpMetaData(loadPath, conf); + if (!dmd.isVersionCompatible()) { + throw new SemanticException + ( + "Dump version: " + dmd.getHiveVersion() + ". Versions older than " + + Utilities.MIN_VERSION_FOR_NEW_DUMP_FORMAT + " are not supported." + ); + } + boolean evDump = false; // we will decide what hdfs locations needs to be copied over here as well. if (dmd.isIncrementalDump()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index eb87bb925d..6ba7777dd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -53,6 +53,7 @@ private final HiveConf hiveConf; private Long dumpExecutionId; private boolean replScopeModified = false; + private int hiveVersion; public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -124,14 +125,16 @@ private void loadDumpFromFile() throws SemanticException { br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line; if ((line = br.readLine()) != null) { - String[] lineContents = line.split("\t", 7); + String[] lineContents = line.split("\t", 8); setDump(lineContents[0].equals(Utilities.nullStringOutput) ? null : DumpType.valueOf(lineContents[0]), lineContents[1].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[1]), - lineContents[2].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[2]), + lineContents[2].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[2]), lineContents[3].equals(Utilities.nullStringOutput) ? null : new Path(lineContents[3]), lineContents[4].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[4]), Boolean.valueOf(lineContents[6])); setPayload(lineContents[5].equals(Utilities.nullStringOutput) ? null : lineContents[5]); + setHiveVersion((lineContents.length < 8 + || lineContents[7].equals(Utilities.nullStringOutput)) ? -1 : Integer.parseInt(lineContents[7])); } else { throw new IOException( "Unable to read valid values from dumpFile:" + dumpFile.toUri().toString()); @@ -235,11 +238,25 @@ public void write(boolean replace) throws SemanticException { cmRoot != null ? cmRoot.toString() : null, dumpExecutionId != null ? dumpExecutionId.toString() : null, payload, - String.valueOf(replScopeModified)) + String.valueOf(replScopeModified), + String.valueOf(hiveVersion)) ); if (replScope != null) { listValues.add(prepareReplScopeValues()); } Utils.writeOutput(listValues, dumpFile, hiveConf, replace); } + + public int getHiveVersion() { + return this.hiveVersion; + } + + public void setHiveVersion(int hiveVersion) { + this.hiveVersion = hiveVersion; + } + + public boolean isVersionCompatible() throws SemanticException { + initializeIfNot(); + return this.hiveVersion >= Utilities.MIN_VERSION_FOR_NEW_DUMP_FORMAT; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestDumpMetaData.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestDumpMetaData.java new file mode 100644 index 0000000000..f921e8143d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestDumpMetaData.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public class TestDumpMetaData { + private Path dumpRoot = new Path("file:///tmp");; + private HiveConf conf = new HiveConf(); + private DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); + + @Before + public void setUp() { + dmd.setDump(DumpType.BOOTSTRAP, 12L, 14L, new Path("file:///tmp/cmroot"), 1L, false); + } + + @Test + public void testIncompatibleVersion() throws SemanticException { + dmd.setHiveVersion(2); + dmd.write(true); + + DumpMetaData dmdLoad = new DumpMetaData(dumpRoot, conf); + assertFalse(dmdLoad.isVersionCompatible()); + } + + @Test + public void testCompatibleVersion() throws SemanticException { + dmd.setHiveVersion(3); + dmd.write(true); + + DumpMetaData dmdLoad = new DumpMetaData(dumpRoot, conf); + assertTrue(dmdLoad.isVersionCompatible()); + } + + @Test + public void testEmptyVersion() throws SemanticException { + List> listValues = new ArrayList<>(); + listValues.add( + Arrays.asList( + DumpType.BOOTSTRAP.toString(), + "12", + "14", + "file:///tmp/cmroot", + "1", + "payload", + "false") + ); + Utils.writeOutput(listValues, new Path(dumpRoot, DumpMetaData.getDmdFileName()), conf, true); + + DumpMetaData dmdLoad = new DumpMetaData(dumpRoot, conf); + assertFalse(dmdLoad.isVersionCompatible()); + } + +} -- 2.24.3 (Apple Git-128)