diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml
index 3bb9f4d543..9726df1413 100644
--- a/hcatalog/webhcat/java-client/pom.xml
+++ b/hcatalog/webhcat/java-client/pom.xml
@@ -82,5 +82,27 @@
test
-
+
+ ${basedir}/src/main/java
+ ${basedir}/src/test
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+ *.xml
+
+
+
+
+
+
+
diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ErroredReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ErroredReplicationTask.java
new file mode 100644
index 0000000000..f30e9128a0
--- /dev/null
+++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ErroredReplicationTask.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.api.repl;
+
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+
+/**
+ * ErroredReplicationTask is a special kind of NoopReplicationTask in that it
+ * is not actionable, and wraps an error that might have occurred during Task
+ * instantiation time. This is used to protect "future" events that we know
+ * nothing about from breaking the system by throwing IllegalStateExceptions.
+ *
+ * Whether or not the user intends to do something with these tasks and act
+ * upon the exceptions is left to the user to determine how they can best use them.
+ *
+ */
+public class ErroredReplicationTask extends NoopReplicationTask {
+
+ RuntimeException errorCause = null;
+
+ public ErroredReplicationTask(HCatNotificationEvent event, RuntimeException e) {
+ super(event);
+ this.errorCause = e;
+ }
+
+ public RuntimeException getCause(){
+ return this.errorCause;
+ }
+
+ @Override
+ public boolean isActionable(){
+ return false;
+ }
+
+}
diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java
index 64ddae2ad9..fd0c0feba1 100644
--- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java
+++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java
@@ -20,7 +20,7 @@
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatNotificationEvent;
-import org.apache.hive.hcatalog.api.repl.NoopReplicationTask;
+import org.apache.hive.hcatalog.api.repl.ErroredReplicationTask;
import org.apache.hive.hcatalog.api.repl.ReplicationTask;
import org.apache.hive.hcatalog.common.HCatConstants;
@@ -57,7 +57,8 @@ public ReplicationTask create(HCatClient client, HCatNotificationEvent event){
} else if (event.getEventType().equals(HCatConstants.HCAT_INSERT_EVENT)) {
return new InsertReplicationTask(event);
} else {
- throw new IllegalStateException("Unrecognized Event type, no replication task available");
+ return new ErroredReplicationTask(event, new IllegalStateException(
+ "Unrecognized Event type, no replication task available"));
}
}
}
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1BackwardCompatChecker.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1BackwardCompatChecker.java
new file mode 100644
index 0000000000..740e0e71e2
--- /dev/null
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1BackwardCompatChecker.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api.repl;
+
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class to enable testing of Replv1 backward compatibility testing.
+ */
+public class ReplicationV1BackwardCompatChecker {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ReplicationV1BackwardCompatChecker.class);
+
+ private static ThreadLocal testEventId = null;
+ private IMetaStoreClient metaStoreClient = null;
+ private HiveConf hconf = null;
+
+
+ public ReplicationV1BackwardCompatChecker(IMetaStoreClient metaStoreClient, HiveConf hconf){
+ this.metaStoreClient = metaStoreClient;
+ this.hconf = hconf;
+ testEventId = new ThreadLocal(){
+ @Override
+ protected Long initialValue(){
+ Long l = getCurrentNotificationId();
+ return (l == null ? 0 : l );
+ }
+ };
+ LOG.info("Replv1 backward compatibility tester initialized at " + testEventId.get());
+ }
+
+ private Long getCurrentNotificationId(){
+ CurrentNotificationEventId cid = null;
+ try {
+ cid = metaStoreClient.getCurrentNotificationEventId();
+ return cid.getEventId();
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Helper method to verify that all events generated since last call are compatible with replv1.
+ * If this is called multiple times, it does this check for all events incurred since the last
+ * time it was called.
+ */
+ public void doBackwardCompatibilityCheck(boolean eventsMustExist) {
+ // try to instantiate the old replv1 task generation on every event produced.
+ long timeBefore = System.currentTimeMillis();
+
+ Long testEventIdPrev = testEventId.get();
+ Long testEventIdNow = getCurrentNotificationId();
+
+ if (testEventIdPrev == null){
+ testEventIdPrev = 0L;
+ }
+ if (testEventIdNow == null){
+ testEventIdNow = 0L;
+ }
+
+ testEventId.set(testEventIdNow);
+
+ if (eventsMustExist){
+ assertTrue("New events must exist between old["
+ + testEventIdPrev + "] and [" + testEventIdNow + "]",
+ testEventIdNow > testEventIdPrev);
+ } else if (testEventIdNow <= testEventIdPrev){
+ return; // nothing further to test.
+ }
+
+ Map unhandledTasks = new LinkedHashMap<>();
+ Map incompatibleTasks = new LinkedHashMap<>();
+ int eventCount = 0;
+
+ LOG.info( "Checking replv1 backward compatibility for events between : "
+ + testEventIdPrev + " -> " + testEventIdNow);
+ IMetaStoreClient.NotificationFilter evFilter =
+ new IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent notificationEvent) {
+ return true;
+ }
+ };
+ EventUtils.MSClientNotificationFetcher evFetcher =
+ new EventUtils.MSClientNotificationFetcher(metaStoreClient);
+ try {
+ EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+ evFetcher, testEventIdPrev,
+ Ints.checkedCast(testEventIdNow - testEventIdPrev) + 1,
+ evFilter);
+ ReplicationTask.resetFactory(null);
+ assertTrue("We should have found some events",evIter.hasNext());
+ while (evIter.hasNext()){
+ eventCount++;
+ NotificationEvent ev = evIter.next();
+ // convert to HCatNotificationEvent, and then try to instantiate a ReplicationTask on it.
+ try {
+ ReplicationTask rtask = ReplicationTask.create(HCatClient.create(hconf), new HCatNotificationEvent(ev));
+ if (rtask instanceof ErroredReplicationTask) {
+ unhandledTasks.put(ev, ((ErroredReplicationTask) rtask).getCause());
+ }
+ } catch (RuntimeException re){
+ incompatibleTasks.put(ev, re);
+ }
+ }
+ } catch (IOException e) {
+ assertNull("Got an exception when we shouldn't have - replv1 backward incompatibility issue:",e);
+ }
+
+ if (unhandledTasks.size() > 0){
+ LOG.warn("Events found that would not be coverable by replv1 replication: " + unhandledTasks.size());
+ for (NotificationEvent ev : unhandledTasks.keySet()){
+ RuntimeException re = unhandledTasks.get(ev);
+ LOG.warn(
+ "ErroredReplicationTask encountered - new event type does not correspond to a replv1 task:"
+ + ev.toString(), re);
+ }
+ }
+ if (incompatibleTasks.size() > 0){
+ LOG.warn("Events found that caused errors in replv1 replication: " + incompatibleTasks.size());
+ for (NotificationEvent ev : incompatibleTasks.keySet()){
+ RuntimeException re = incompatibleTasks.get(ev);
+ LOG.warn(
+ "RuntimeException encountered - new event type caused a replv1 break."
+ + ev.toString(), re);
+ }
+ }
+ assertEquals(0,incompatibleTasks.size());
+
+ long timeAfter = System.currentTimeMillis();
+ LOG.info("Backward compatibility check timing:" + timeBefore + " -> " + timeAfter
+ + ", ev: " + testEventIdPrev + " => " + testEventIdNow
+ + ", #events processed=" + eventCount);
+ }
+}
diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index c157aedca2..bb6b10554f 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -72,6 +72,13 @@
test
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+ ${project.version}
+ test-jar
+ test
+
+
org.apache.hive
hive-hbase-handler
${project.version}
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 50d8878e1c..514468b87b 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -94,11 +94,14 @@
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.api.repl.ReplicationV1BackwardCompatChecker;
import org.apache.hive.hcatalog.data.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
import org.junit.Test;
+import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,6 +109,7 @@
* Tests DbNotificationListener when used as a transactional event listener
* (hive.metastore.transactional.event.listeners)
*/
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestDbNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class
.getName());
@@ -118,6 +122,8 @@
private int startTime;
private long firstEventId;
+ private static ReplicationV1BackwardCompatChecker bcompat = null;
+
/* This class is used to verify that HiveMetaStore calls the non-transactional listeners with the
* current event ID set by the DbNotificationListener class */
public static class MockMetaStoreEventListener extends MetaStoreEventListener {
@@ -238,6 +244,8 @@ public static void connectToMetastore() throws Exception {
msClient = new HiveMetaStoreClient(conf);
driver = new Driver(conf);
md = MessageFactory.getInstance().getDeserializer();
+
+ bcompat = new ReplicationV1BackwardCompatChecker(msClient, conf);
}
@Before
@@ -1553,4 +1561,22 @@ public void cleanupNotifs() throws Exception {
LOG.info("second trigger done");
assertEquals(0, rsp2.getEventsSize());
}
+
+ /**
+ * This test is named starting with a zzz because we want it to alphabetically be the last test run.
+ * This is enforced because we want it to analyze all the events generated.
+ *
+ * Outside of rolling out our own TestRunner, this is the easiest way to ensure that we execute all
+ * other tests that generate the events we want to test before we test those events for backward
+ * compatibility.
+ *
+ * The main problem we have with this approach, however, is if junit tests are parallelized, then
+ * this might wind up testing only those events that have been generated by the time it runs.
+ *
+ * Also, we cannot add this in @After or @AfterClass, because those do not count for "test failure".
+ */
+ @Test
+ public void zzztestBackwardCompatibilityCheck() {
+ bcompat.doBackwardCompatibilityCheck(true);
+ }
}
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index 8be25b2401..8adf309430 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -86,6 +86,11 @@
${project.version}
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+ ${project.version}
+
+
org.apache.hive
hive-it-util
${project.version}
@@ -167,6 +172,13 @@
test-jar
test
+
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+ ${project.version}
+ test-jar
+ test
+
junit
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 05c1244565..9f5c2c983a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -38,14 +38,17 @@
import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.api.repl.ReplicationV1BackwardCompatChecker;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +66,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestReplicationScenarios {
@Rule
@@ -81,6 +85,7 @@
private static int msPort;
private static Driver driver;
private static HiveMetaStoreClient metaStoreClient;
+ private static ReplicationV1BackwardCompatChecker bcompat = null;
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private ArrayList lastResults;
@@ -126,6 +131,7 @@ public static void setUpBeforeClass() throws Exception {
driver = new Driver(hconf);
SessionState.start(new CliSessionState(hconf));
metaStoreClient = new HiveMetaStoreClient(hconf);
+ bcompat = new ReplicationV1BackwardCompatChecker(metaStoreClient,hconf);
}
@AfterClass
@@ -173,6 +179,24 @@ private Tuple loadAndVerify(String dbName) throws IOException {
}
/**
+ * This test is named starting with a zzz because we want it to alphabetically be the last test run.
+ * This is enforced because we want it to analyze all the events generated.
+ *
+ * Outside of rolling out our own TestRunner, this is the easiest way to ensure that we execute all
+ * other tests that generate the events we want to test before we test those events for backward
+ * compatibility.
+ *
+ * The main problem we have with this approach, however, is if junit tests are parallelized, then
+ * this might wind up testing only those events that have been generated by the time it runs.
+ *
+ * Also, we cannot add this in @After or @AfterClass, because those do not count for "test failure".
+ */
+ @Test
+ public void zzztestBackwardCompatibilityCheck() {
+ bcompat.doBackwardCompatibilityCheck(true);
+ }
+
+ /**
* Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
* Inserts data into one of the ptned tables, and one of the unptned tables,
* and verifies that a REPL DUMP followed by a REPL LOAD is able to load it
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index e4cc799aca..3e41468f58 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -20,12 +20,15 @@
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestReplicationScenariosAcrossInstances {
@Rule
public final TestName testName = new TestName();
@@ -88,4 +91,22 @@ public void testBootstrapFunctionReplication() throws Throwable {
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
.verify(replicatedDbName + ".testFunction");
}
+
+ /**
+ * This test is named starting with a zzz because we want it to alphabetically be the last test run.
+ * This is enforced because we want it to analyze all the events generated.
+ *
+ * Outside of rolling out our own TestRunner, this is the easiest way to ensure that we execute all
+ * other tests that generate the events we want to test before we test those events for backward
+ * compatibility.
+ *
+ * The main problem we have with this approach, however, is if junit tests are parallelized, then
+ * this might wind up testing only those events that have been generated by the time it runs.
+ *
+ * Also, we cannot add this in @After or @AfterClass, because those do not count for "test failure".
+ */
+ @Test
+ public void zzztestBackwardCompatibilityCheck() {
+ primary.doBackwardCompatibilityCheck(true);
+ }
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index e26d54ce39..5a3a3479d8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.api.repl.ReplicationV1BackwardCompatChecker;
import org.apache.hive.hcatalog.listener.DbNotificationListener;
import java.io.IOException;
@@ -40,6 +41,8 @@
private Driver driver;
private HiveMetaStoreClient client;
private HiveConf hconf;
+ private ReplicationV1BackwardCompatChecker bcompat = null;
+
private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
@@ -51,6 +54,7 @@
this.driver = other.driver;
this.client = other.client;
this.hconf = other.hconf;
+ this.bcompat = other.bcompat;
}
WarehouseInstance() throws Exception {
@@ -90,6 +94,8 @@
driver = new Driver(hconf);
SessionState.start(new CliSessionState(hconf));
client = new HiveMetaStoreClient(hconf);
+
+ bcompat = new ReplicationV1BackwardCompatChecker(client,hconf);
}
private int next = 0;
@@ -179,6 +185,10 @@ private void printOutput() throws IOException {
}
}
+ public void doBackwardCompatibilityCheck(boolean eventsMustExist) {
+ bcompat.doBackwardCompatibilityCheck(eventsMustExist);
+ }
+
static class Tuple {
final String dumpLocation;
final String lastReplicationId;