diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml
index 3bb9f4d543..9726df1413 100644
--- a/hcatalog/webhcat/java-client/pom.xml
+++ b/hcatalog/webhcat/java-client/pom.xml
@@ -82,5 +82,27 @@
test
-
+
+ ${basedir}/src/main/java
+ ${basedir}/src/test
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+ *.xml
+
+
+
+
+
+
+
diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ErroredReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ErroredReplicationTask.java
new file mode 100644
index 0000000000..f30e9128a0
--- /dev/null
+++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ErroredReplicationTask.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.api.repl;
+
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+
+/**
+ * ErroredReplicationTask is a special kind of NoopReplicationTask in that it
+ * is not actionable, and wraps an error that might have occurred during Task
+ * instantiation time. This is used to protect "future" events that we know
+ * nothing about from breaking the system by throwing IllegalStateExceptions.
+ *
+ * Whether or not the user intends to do something with these tasks and act
+ * upon the exceptions is left to the user to determine how they can best use them.
+ *
+ */
+public class ErroredReplicationTask extends NoopReplicationTask {
+
+ RuntimeException errorCause = null;
+
+ public ErroredReplicationTask(HCatNotificationEvent event, RuntimeException e) {
+ super(event);
+ this.errorCause = e;
+ }
+
+ public RuntimeException getCause(){
+ return this.errorCause;
+ }
+
+ @Override
+ public boolean isActionable(){
+ return false;
+ }
+
+}
diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java
index 64ddae2ad9..fd0c0feba1 100644
--- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java
+++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java
@@ -20,7 +20,7 @@
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatNotificationEvent;
-import org.apache.hive.hcatalog.api.repl.NoopReplicationTask;
+import org.apache.hive.hcatalog.api.repl.ErroredReplicationTask;
import org.apache.hive.hcatalog.api.repl.ReplicationTask;
import org.apache.hive.hcatalog.common.HCatConstants;
@@ -57,7 +57,8 @@ public ReplicationTask create(HCatClient client, HCatNotificationEvent event){
} else if (event.getEventType().equals(HCatConstants.HCAT_INSERT_EVENT)) {
return new InsertReplicationTask(event);
} else {
- throw new IllegalStateException("Unrecognized Event type, no replication task available");
+ return new ErroredReplicationTask(event, new IllegalStateException(
+ "Unrecognized Event type, no replication task available"));
}
}
}
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
new file mode 100644
index 0000000000..61915100bb
--- /dev/null
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api.repl;
+
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+import org.apache.thrift.TException;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class to enable testing of Replv1 compatibility testing.
+ *
+ * If event formats/etc change in the future, testing against this allows tests
+ * to determine if they break backward compatibility with Replv1.
+ *
+ * Use as a junit TestRule on tests that generate events to test if the events
+ * generated are compatible with replv1.
+ */
+public class ReplicationV1CompatRule implements TestRule {
+
+ public @interface SkipReplV1CompatCheck {
+
+ }
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ReplicationV1CompatRule.class);
+
+ private static ThreadLocal testEventId = null;
+ private IMetaStoreClient metaStoreClient = null;
+ private HiveConf hconf = null;
+ private List testsToSkip = null;
+
+ public ReplicationV1CompatRule(IMetaStoreClient metaStoreClient, HiveConf hconf){
+ this(metaStoreClient, hconf, new ArrayList());
+ }
+ public ReplicationV1CompatRule(IMetaStoreClient metaStoreClient, HiveConf hconf, List testsToSkip){
+ this.metaStoreClient = metaStoreClient;
+ this.hconf = hconf;
+ testEventId = new ThreadLocal(){
+ @Override
+ protected Long initialValue(){
+ return getCurrentNotificationId();
+ }
+ };
+ this.testsToSkip = testsToSkip;
+ LOG.info("Replv1 backward compatibility tester initialized at " + testEventId.get());
+ }
+
+ private Long getCurrentNotificationId(){
+ CurrentNotificationEventId cid = null;
+ try {
+ cid = metaStoreClient.getCurrentNotificationEventId();
+ Long l = cid.getEventId();
+ return (l == null)? 0L : l;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Helper method to verify that all events generated since last call are compatible with
+ * replv1. If this is called multiple times, it does this check for all events incurred
+ * since the last time it was called.
+ *
+ * @param eventsMustExist : Determines whether or not non-presence of events should be
+ * considered an error. You probably don't need this except during test development
+ * for validation. If you're running this for a whole set of tests in one go, not
+ * having any events is probably an error condition.
+ */
+ public void doBackwardCompatibilityCheck(boolean eventsMustExist) {
+
+ Long testEventIdPrev = testEventId.get();
+ Long testEventIdNow = getCurrentNotificationId();
+
+ testEventId.set(testEventIdNow);
+
+ if (eventsMustExist){
+ assertTrue("New events must exist between old["
+ + testEventIdPrev + "] and [" + testEventIdNow + "]",
+ testEventIdNow > testEventIdPrev);
+ } else if (testEventIdNow <= testEventIdPrev){
+ return; // nothing further to test.
+ }
+ doBackwardCompatibilityCheck(testEventIdPrev,testEventIdNow);
+ }
+
+
+ public void doBackwardCompatibilityCheck(long testEventIdBefore, long testEventIdAfter){
+ // try to instantiate the old replv1 task generation on every event produced.
+ long timeBefore = System.currentTimeMillis();
+
+ Map unhandledTasks = new LinkedHashMap<>();
+ Map incompatibleTasks = new LinkedHashMap<>();
+ int eventCount = 0;
+
+ LOG.info( "Checking replv1 backward compatibility for events between : "
+ + testEventIdBefore + " -> " + testEventIdAfter);
+ IMetaStoreClient.NotificationFilter evFilter =
+ new IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent notificationEvent) {
+ return true;
+ }
+ };
+ EventUtils.MSClientNotificationFetcher evFetcher =
+ new EventUtils.MSClientNotificationFetcher(metaStoreClient);
+ try {
+ EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+ evFetcher, testEventIdBefore,
+ Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1,
+ evFilter);
+ ReplicationTask.resetFactory(null);
+ assertTrue("We should have found some events",evIter.hasNext());
+ while (evIter.hasNext()){
+ eventCount++;
+ NotificationEvent ev = evIter.next();
+ // convert to HCatNotificationEvent, and then try to instantiate a ReplicationTask on it.
+ try {
+ ReplicationTask rtask = ReplicationTask.create(HCatClient.create(hconf), new HCatNotificationEvent(ev));
+ if (rtask instanceof ErroredReplicationTask) {
+ unhandledTasks.put(ev, ((ErroredReplicationTask) rtask).getCause());
+ }
+ } catch (RuntimeException re){
+ incompatibleTasks.put(ev, re);
+ }
+ }
+ } catch (IOException e) {
+ assertNull("Got an exception when we shouldn't have - replv1 backward incompatibility issue:",e);
+ }
+
+ if (unhandledTasks.size() > 0){
+ LOG.warn("Events found that would not be coverable by replv1 replication: " + unhandledTasks.size());
+ for (NotificationEvent ev : unhandledTasks.keySet()){
+ RuntimeException re = unhandledTasks.get(ev);
+ LOG.warn(
+ "ErroredReplicationTask encountered - new event type does not correspond to a replv1 task:"
+ + ev.toString(), re);
+ }
+ }
+ if (incompatibleTasks.size() > 0){
+ LOG.warn("Events found that caused errors in replv1 replication: " + incompatibleTasks.size());
+ for (NotificationEvent ev : incompatibleTasks.keySet()){
+ RuntimeException re = incompatibleTasks.get(ev);
+ LOG.warn(
+ "RuntimeException encountered - new event type caused a replv1 break."
+ + ev.toString(), re);
+ }
+ }
+ assertEquals(0,incompatibleTasks.size());
+
+ long timeAfter = System.currentTimeMillis();
+ LOG.info("Backward compatibility check timing:" + timeBefore + " -> " + timeAfter
+ + ", ev: " + testEventIdBefore + " => " + testEventIdAfter
+ + ", #events processed=" + eventCount);
+ }
+
+ @Override
+ public Statement apply(Statement statement, Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ Long prevNotificationId = getCurrentNotificationId();
+ statement.evaluate();
+ Long currNotificationId = getCurrentNotificationId();
+ if(!testsToSkip.contains(description.getMethodName())){
+ doBackwardCompatibilityCheck(prevNotificationId,currNotificationId);
+ } else {
+ LOG.info("Skipping backward compatibility check, as requested, for test :" + description);
+ }
+ }
+ };
+ }
+}
diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index c157aedca2..bb6b10554f 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -72,6 +72,13 @@
test
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+ ${project.version}
+ test-jar
+ test
+
+
org.apache.hive
hive-hbase-handler
${project.version}
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 50d8878e1c..2168a67d93 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -94,11 +94,14 @@
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
import org.apache.hive.hcatalog.data.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,6 +121,20 @@
private int startTime;
private long firstEventId;
+ private static List testsToSkipForReplV1BackwardCompatTesting =
+ new ArrayList<>(Arrays.asList("cleanupNotifs", "sqlTempTable"));
+ // Make sure we skip backward-compat checking for those tests that don't generate events
+
+ private static ReplicationV1CompatRule bcompat = null;
+
+ @Rule
+ public TestRule replV1BackwardCompatibleRule = bcompat;
+ // Note - above looks funny because it seems like we're instantiating a static var, and
+ // then a non-static var as the rule, but the reason this is required is because Rules
+ // are not allowed to be static, but we wind up needing it initialized from a static
+ // context. So, bcompat is initialzed in a static context, but this rule is initialized
+ // before the tests run, and will pick up an initialized value of bcompat.
+
/* This class is used to verify that HiveMetaStore calls the non-transactional listeners with the
* current event ID set by the DbNotificationListener class */
public static class MockMetaStoreEventListener extends MetaStoreEventListener {
@@ -238,6 +255,8 @@ public static void connectToMetastore() throws Exception {
msClient = new HiveMetaStoreClient(conf);
driver = new Driver(conf);
md = MessageFactory.getInstance().getDeserializer();
+
+ bcompat = new ReplicationV1CompatRule(msClient, conf, testsToSkipForReplV1BackwardCompatTesting );
}
@Before
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index 8be25b2401..8adf309430 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -86,6 +86,11 @@
${project.version}
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+ ${project.version}
+
+
org.apache.hive
hive-it-util
${project.version}
@@ -167,6 +172,13 @@
test-jar
test
+
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+ ${project.version}
+ test-jar
+ test
+
junit
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 05c1244565..323b85a274 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
@@ -46,6 +47,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +84,12 @@
private static Driver driver;
private static HiveMetaStoreClient metaStoreClient;
+ @Rule
+ public TestRule replV1BackwardCompatibleRule =
+ new ReplicationV1CompatRule(metaStoreClient, hconf,
+ new ArrayList<>(Arrays.asList("testEventFilters")));
+ // Make sure we skip backward-compat checking for those tests that don't generate events
+
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private ArrayList lastResults;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index e4cc799aca..5621f26dd4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -23,12 +23,17 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestReplicationScenariosAcrossInstances {
@Rule
public final TestName testName = new TestName();
+
+ @Rule
+ public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule();
+
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private static WarehouseInstance primary, replica;
@@ -88,4 +93,5 @@ public void testBootstrapFunctionReplication() throws Throwable {
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
.verify(replicatedDbName + ".testFunction");
}
+
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index e26d54ce39..f8bb24884d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -28,7 +28,9 @@
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
import org.apache.hive.hcatalog.listener.DbNotificationListener;
+import org.junit.rules.TestRule;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,6 +42,8 @@
private Driver driver;
private HiveMetaStoreClient client;
private HiveConf hconf;
+ private ReplicationV1CompatRule bcompat = null;
+
private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
@@ -51,6 +55,7 @@
this.driver = other.driver;
this.client = other.client;
this.hconf = other.hconf;
+ this.bcompat = other.bcompat;
}
WarehouseInstance() throws Exception {
@@ -90,6 +95,8 @@
driver = new Driver(hconf);
SessionState.start(new CliSessionState(hconf));
client = new HiveMetaStoreClient(hconf);
+
+ bcompat = new ReplicationV1CompatRule(client,hconf);
}
private int next = 0;
@@ -179,6 +186,14 @@ private void printOutput() throws IOException {
}
}
+ public TestRule getReplivationV1CompatRule(){
+ return bcompat;
+ }
+
+ public void doBackwardCompatibilityCheck(boolean eventsMustExist) {
+ bcompat.doBackwardCompatibilityCheck(eventsMustExist);
+ }
+
static class Tuple {
final String dumpLocation;
final String lastReplicationId;