diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f24bcbd8a2..487208054a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -111,14 +111,15 @@ public String getName() { @Override protected int execute(DriverContext driverContext) { try { + Hive hiveDb = getHive(); Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir()); DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; if (work.isBootStrapDump()) { - lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot); + lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot, hiveDb); } else { - lastReplId = incrementalDump(dumpRoot, dmd, cmRoot); + lastReplId = incrementalDump(dumpRoot, dmd, cmRoot, hiveDb); } prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); } catch (RuntimeException e) { @@ -141,7 +142,7 @@ private void prepareReturnValues(List values, String schema) throws Sema Utils.writeOutput(values, new Path(work.resultTempPath), conf); } - private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { + private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { Long lastReplId;// get list of events matching dbPattern & tblPattern // go through each event, and dump out each event to a event-level dump dir inside dumproot @@ -151,7 +152,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw // same factory, restricting by message format is effectively a guard against // older leftover data that would cause us problems. - work.overrideEventTo(getHive()); + work.overrideEventTo(hiveDb); IMetaStoreClient.NotificationFilter evFilter = new AndFilter( new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern), @@ -159,7 +160,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat())); EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(getHive()); + = new EventUtils.MSClientNotificationFetcher(hiveDb); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); @@ -175,7 +176,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, cmRoot); + dumpEvent(ev, evRoot, cmRoot, hiveDb); } replLogger.endLog(lastReplId.toString()); @@ -193,11 +194,11 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw return lastReplId; } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, cmRoot, - getHive(), + db, conf, getNewEventOnlyReplicationSpec(ev.getEventId()), work.dbNameOrPattern, @@ -216,12 +217,11 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) { return rspec; } - Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { + Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { // bootstrap case // Last repl id would've been captured during compile phase in queryState configs before opening txn. // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore. - Hive hiveDb = getHive(); Long bootDumpBeginReplId = queryState.getConf().getLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0L); @@ -229,11 +229,11 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exceptio for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), - Utils.getAllTables(getHive(), dbName).size(), - getHive().getAllFunctions().size()); + Utils.getAllTables(hiveDb, dbName).size(), + hiveDb.getAllFunctions().size()); replLogger.startLog(); - Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId); - dumpFunctionMetadata(dbName, dumpRoot); + Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb); + dumpFunctionMetadata(dbName, dumpRoot, hiveDb); String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; @@ -241,8 +241,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exceptio for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); - dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId); - dumpConstraintMetadata(dbName, tblName, dbRoot); + dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb); + dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb); } } catch (Exception e) { caught = e; @@ -279,20 +279,19 @@ long currentNotificationId(Hive hiveDb) throws TException { return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); } - Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception { + Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) throws Exception { Path dbRoot = new Path(dumpRoot, dbName); // TODO : instantiating FS objects are generally costly. Refactor FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); - HiveWrapper.Tuple database = new HiveWrapper(getHive(), dbName, lastReplId).database(); + HiveWrapper.Tuple database = new HiveWrapper(hiveDb, dbName, lastReplId).database(); EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); return dbRoot; } - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId) throws Exception { + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb) throws Exception { try { - Hive db = getHive(); - HiveWrapper.Tuple tuple = new HiveWrapper(db, dbName).table(tblName); + HiveWrapper.Tuple
tuple = new HiveWrapper(hiveDb, dbName).table(tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true); @@ -308,7 +307,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write(); + exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); @@ -406,11 +405,11 @@ private String getNextDumpDir() { } } - void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception { + void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception { Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); - List functionNames = getHive().getFunctions(dbName, "*"); + List functionNames = hiveDb.getFunctions(dbName, "*"); for (String functionName : functionNames) { - HiveWrapper.Tuple tuple = functionTuple(functionName, dbName); + HiveWrapper.Tuple tuple = functionTuple(functionName, dbName, hiveDb); if (tuple == null) { continue; } @@ -425,16 +424,15 @@ void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception { } } - void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception { + void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception { try { Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME); Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName); Path fkConstraintsFile = new Path(constraintsRoot, ConstraintFileType.FOREIGNKEY.getPrefix() + tblName); - Hive db = getHive(); - List pks = db.getPrimaryKeyList(dbName, tblName); - List fks = db.getForeignKeyList(dbName, tblName); - List uks = db.getUniqueConstraintList(dbName, tblName); - List nns = db.getNotNullConstraintList(dbName, tblName); + List pks = hiveDb.getPrimaryKeyList(dbName, tblName); + List fks = hiveDb.getForeignKeyList(dbName, tblName); + List uks = hiveDb.getUniqueConstraintList(dbName, tblName); + List nns = hiveDb.getNotNullConstraintList(dbName, tblName); if ((pks != null && !pks.isEmpty()) || (uks != null && !uks.isEmpty()) || (nns != null && !nns.isEmpty())) { try (JsonWriter jsonWriter = @@ -457,9 +455,9 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws E } } - private HiveWrapper.Tuple functionTuple(String functionName, String dbName) { + private HiveWrapper.Tuple functionTuple(String functionName, String dbName, Hive hiveDb) { try { - HiveWrapper.Tuple tuple = new HiveWrapper(getHive(), dbName).function(functionName); + HiveWrapper.Tuple tuple = new HiveWrapper(hiveDb, dbName).function(functionName); if (tuple.object.getResourceUris().isEmpty()) { LOG.warn("Not replicating function: " + functionName + " as it seems to have been created " + "without USING clause"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index d5956ddb18..7a14544aab 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -79,16 +79,16 @@ String getValidTxnListForReplDump(Hive hiveDb) { } @Override - void dumpFunctionMetadata(String dbName, Path dumpRoot) { + void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) { } @Override - Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) { + Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) { return Mockito.mock(Path.class); } @Override - void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) { + void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) { } } @@ -116,7 +116,7 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId) + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb) throws Exception { tableDumpCount++; if (tableDumpCount > 1) { @@ -133,7 +133,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, ); try { - task.bootStrapDump(mock(Path.class), null, mock(Path.class)); + task.bootStrapDump(mock(Path.class), null, mock(Path.class), Hive.get()); } finally { verifyStatic(); Utils.resetDbBootstrapDumpState(same(hive), eq("default"), eq(dbRandomKey));