diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java index d5e227c..550a5e5 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java @@ -22,8 +22,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hive.hcatalog.api.HCatClient; import org.apache.hive.hcatalog.api.HCatNotificationEvent; import org.apache.thrift.TException; @@ -43,6 +45,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Utility class to enable testing of Replv1 compatibility testing. * @@ -65,6 +70,8 @@ private HiveConf hconf = null; private List testsToSkip = null; + private Hive hiveDb; + public ReplicationV1CompatRule(IMetaStoreClient metaStoreClient, HiveConf hconf){ this(metaStoreClient, hconf, new ArrayList()); } @@ -79,6 +86,7 @@ protected Long initialValue(){ }; this.testsToSkip = testsToSkip; LOG.info("Replv1 backward compatibility tester initialized at " + testEventId.get()); + this.hiveDb = mock(Hive.class); } private Long getCurrentNotificationId(){ @@ -137,16 +145,17 @@ public boolean accept(NotificationEvent notificationEvent) { return true; } }; - EventUtils.MSClientNotificationFetcher evFetcher = - new EventUtils.MSClientNotificationFetcher(metaStoreClient); try { + when(hiveDb.getMSC()).thenReturn(metaStoreClient); + EventUtils.MSClientNotificationFetcher evFetcher = + new EventUtils.MSClientNotificationFetcher(hiveDb); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, testEventIdBefore, - Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1, - evFilter); + evFetcher, testEventIdBefore, + Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1, + evFilter); ReplicationTask.resetFactory(null); - assertTrue("We should have found some events",evIter.hasNext()); - while (evIter.hasNext()){ + assertTrue("We should have found some events", evIter.hasNext()); + while (evIter.hasNext()) { eventCount++; NotificationEvent ev = evIter.next(); // convert to HCatNotificationEvent, and then try to instantiate a ReplicationTask on it. @@ -155,11 +164,11 @@ public boolean accept(NotificationEvent notificationEvent) { if (rtask instanceof ErroredReplicationTask) { unhandledTasks.put(ev, ((ErroredReplicationTask) rtask).getCause()); } - } catch (RuntimeException re){ + } catch (RuntimeException re) { incompatibleTasks.put(ev, re); } } - } catch (IOException e) { + } catch (IOException | MetaException e) { assertNull("Got an exception when we shouldn't have - replv1 backward incompatibility issue:",e); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 1e0efe7..4b6bc77 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import javax.annotation.Nullable; + import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -1556,7 +1557,7 @@ public void testInsertToMultiKeyPartition() throws IOException { run("USE " + replDbName, driverMirror); verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)", "location", "namelist/year=1990/month=5/day=25", driverMirror); - run("USE " + dbName, driverMirror); + run("USE " + dbName, driver); String[] ptn_data_3 = new String[] { "abraham", "bob", "carter", "david", "fisher" }; String[] data_after_ovwrite = new String[] { "fisher" }; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index d29c4da..783e4e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -37,14 +37,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Stream; @@ -62,13 +60,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.Entity.Type; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.events.EventConsumer; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 4eba910..8727294 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.exec.repl; -import com.google.common.primitives.Ints; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; @@ -33,7 +31,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; @@ -48,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; @@ -160,7 +158,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat())); EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(getHive().getMSC()); + = new EventUtils.MSClientNotificationFetcher(getHive()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); @@ -247,30 +245,8 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws replLogger.endLog(bootDumpBeginReplId.toString()); } Long bootDumpEndReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); - LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, - bootDumpEndReplId); - - // Now that bootstrap has dumped all objects related, we have to account for the changes - // that occurred while bootstrap was happening - i.e. we have to look through all events - // during the bootstrap period and consolidate them with our dump. - - IMetaStoreClient.NotificationFilter evFilter = - new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern); - EventUtils.MSClientNotificationFetcher evFetcher = - new EventUtils.MSClientNotificationFetcher(hiveDb.getMSC()); - EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, bootDumpBeginReplId, - Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1, - evFilter); - // Now we consolidate all the events that happenned during the objdump into the objdump - while (evIter.hasNext()) { - NotificationEvent ev = evIter.next(); - Path eventRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); - // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot) - } - LOG.info( - "Consolidation done, preparing to return {},{}->{}", + LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 953cd1d..8c8af47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -457,9 +457,8 @@ private void close() { metaStoreClient.close(); metaStoreClient = null; } - if (syncMetaStoreClient != null) { - syncMetaStoreClient.close(); - } + // syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to close it once. + syncMetaStoreClient = null; if (owner != null) { owner = null; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java similarity index 92% rename from standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java rename to ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java index 2b16897..66abd51 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java @@ -16,13 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hadoop.hive.metastore.messaging; +package org.apache.hadoop.hive.ql.metadata.events; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.thrift.TException; import java.io.IOException; @@ -43,11 +44,11 @@ // MetaStoreClient-based impl of NotificationFetcher public static class MSClientNotificationFetcher implements NotificationFetcher{ - private IMetaStoreClient msc = null; + private Hive hiveDb = null; private Integer batchSize = null; - public MSClientNotificationFetcher(IMetaStoreClient msc){ - this.msc = msc; + public MSClientNotificationFetcher(Hive hiveDb){ + this.hiveDb = hiveDb; } @Override @@ -55,7 +56,7 @@ public int getBatchSize() throws IOException { if (batchSize == null){ try { batchSize = Integer.parseInt( - msc.getConfigValue(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.toString(), "50")); + hiveDb.getMSC().getConfigValue(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.toString(), "50")); // TODO: we're asking the metastore what its configuration for this var is - we may // want to revisit to pull from client side instead. The reason I have it this way // is because the metastore is more likely to have a reasonable config for this than @@ -70,7 +71,7 @@ public int getBatchSize() throws IOException { @Override public long getCurrentNotificationEventId() throws IOException { try { - return msc.getCurrentNotificationEventId().getEventId(); + return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); } catch (TException e) { throw new IOException(e); } @@ -81,7 +82,7 @@ public long getDbNotificationEventsCount(long fromEventId, String dbName) throws try { NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); - return msc.getNotificationEventsCount(rqst).getEventsCount(); + return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount(); } catch (TException e) { throw new IOException(e); } @@ -91,7 +92,7 @@ public long getDbNotificationEventsCount(long fromEventId, String dbName) throws public List getNextNotificationEvents( long pos, IMetaStoreClient.NotificationFilter filter) throws IOException { try { - return msc.getNextNotification(pos,getBatchSize(), filter).getEvents(); + return hiveDb.getMSC().getNextNotification(pos,getBatchSize(), filter).getEvents(); } catch (TException e) { throw new IOException(e.getMessage(), e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java index c35ca44..010f00c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java @@ -26,15 +26,12 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -99,7 +96,7 @@ private NotificationEventPoll(Configuration conf) throws Exception { } EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC()); + = new EventUtils.MSClientNotificationFetcher(Hive.get()); lastCheckedEventId = evFetcher.getCurrentNotificationEventId(); LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId); @@ -135,7 +132,7 @@ public void run() { // Get any new notification events that have been since the last time we checked, // And pass them on to the event handlers. EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC()); + = new EventUtils.MSClientNotificationFetcher(Hive.get()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(evFetcher, lastCheckedEventId, 0, "*", null);