diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index 26cdc6bcfc..ee1e1fd633 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.parse.repl.dump.log.AtlasDumpLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,12 @@ public int execute() { String entityGuid = checkHiveEntityGuid(atlasRequestBuilder, atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB()); long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid); - dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo); + AtlasDumpLogger replLogger = new AtlasDumpLogger(atlasReplInfo.getSrcDB(), + atlasReplInfo.getStagingDir().toString()); + replLogger.startLog(); + long numBytesWritten = dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo); + LOG.debug("Finished dumping atlas metadata, total:{} bytes written", numBytesWritten); + replLogger.endLog(0L); createDumpMetadata(atlasReplInfo, currentModifiedTime); return 0; } catch (Exception e) { @@ -83,7 +89,7 @@ public int execute() { } } - private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException { + public AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException { String errorFormat = "%s is mandatory config for Atlas metadata replication"; //Also validates URL for endpoint. String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat)) @@ -99,7 +105,7 @@ private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedU return atlasReplInfo; } - private long lastStoredTimeStamp() throws SemanticException { + public long lastStoredTimeStamp() throws SemanticException { Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME); BufferedReader br = null; try { @@ -132,17 +138,17 @@ private long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) return ret; } - private void dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo) + public long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo) throws SemanticException { InputStream inputStream = null; + long numBytesWritten = 0L; try { AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo, atlasReplInfo.getSrcCluster()); inputStream = atlasRestClient.exportData(exportRequest); FileSystem fs = FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf()); Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); - long numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream); - LOG.info("Wrote to {} ({} bytes)", exportFilePath, numBytesWritten); + numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream); } catch (SemanticException ex) { throw ex; } catch (Exception ex) { @@ -156,6 +162,7 @@ private void dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasRep } } } + return numBytesWritten; } private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, String clusterName, @@ -174,7 +181,7 @@ private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, Stri return guid; } - private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException { + public void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException { Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME); List> listValues = new ArrayList<>(); listValues.add( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index fceded5fdc..fa18bf3236 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; + +import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,11 @@ public int execute() { AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}", atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); + AtlasLoadLogger replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), + atlasReplInfo.getStagingDir().toString()); + replLogger.startLog(); int importCount = importAtlasMetadata(atlasReplInfo); + replLogger.endLog(importCount); LOG.info("Atlas entities import count {}", importCount); return 0; } catch (Exception e) { @@ -66,7 +72,7 @@ public int execute() { } } - private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException { + public AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException { String errorFormat = "%s is mandatory config for Atlas metadata replication"; //Also validates URL for endpoint. String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat)) @@ -105,7 +111,7 @@ private String getStoredFsUri(Path atlasDumpDir) throws SemanticException { } } - private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception { + public int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception { AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder(); AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest( atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java index e44115317b..34fd2fe542 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java @@ -46,11 +46,15 @@ TABLE_DUMP, FUNCTION_DUMP, EVENT_DUMP, + ATLAS_DUMP_START, + ATLAS_DUMP_END, RANGER_DUMP_START, RANGER_DUMP_END, TABLE_LOAD, FUNCTION_LOAD, EVENT_LOAD, + ATLAS_LOAD_START, + ATLAS_LOAD_END, RANGER_LOAD_START, RANGER_LOAD_END, END diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java new file mode 100644 index 0000000000..840c51ebb3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.log; + +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.ReplState.LogTag; +import org.apache.hadoop.hive.ql.parse.repl.dump.log.state.AtlasDumpBegin; +import org.apache.hadoop.hive.ql.parse.repl.dump.log.state.AtlasDumpEnd; + +/** + * AtlasDumpLogger. + * Repllogger for Atlas metadata dump. + **/ +public class AtlasDumpLogger extends ReplLogger { + private String dbName; + private String dumpDir; + + public AtlasDumpLogger(String dbName, String dumpDir) { + this.dbName = dbName; + this.dumpDir = dumpDir; + } + + @Override + public void startLog() { + new AtlasDumpBegin(dbName).log(LogTag.ATLAS_DUMP_START); + } + + @Override + public void endLog(Long count) { + new AtlasDumpEnd(dbName, dumpDir).log(LogTag.ATLAS_DUMP_END); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java new file mode 100644 index 0000000000..02237c59d7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.log.state; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.hadoop.hive.ql.parse.repl.ReplState; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * AtlasDumpBegin. + * + * ReplState to define Atlas Dump Start. + **/ +public class AtlasDumpBegin extends ReplState { + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private String dbName; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private Long dumpStartTime; + + public AtlasDumpBegin(String dbName) { + this.dbName = dbName; + this.dumpStartTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java new file mode 100644 index 0000000000..074f94d2df --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.log.state; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.hadoop.hive.ql.parse.repl.ReplState; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * AtlasDumpEnd. + * + * ReplState to define Atlas Dump End. + **/ +public class AtlasDumpEnd extends ReplState { + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private String dbName; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private Long dumpEndTime; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private String dumpDir; + + public AtlasDumpEnd(String dbName, String dumpDir) { + this.dbName = dbName; + this.dumpEndTime = System.currentTimeMillis() / 1000; + this.dumpDir = dumpDir; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java new file mode 100644 index 0000000000..603683d861 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.log; + +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.ReplState.LogTag; +import org.apache.hadoop.hive.ql.parse.repl.load.log.state.AtlasLoadBegin; +import org.apache.hadoop.hive.ql.parse.repl.load.log.state.AtlasLoadEnd; + +/** + * Repl logger for Atlas metadata load task. + **/ +public class AtlasLoadLogger extends ReplLogger { + private String sourceDbName; + private String targetDbName; + private String dumpDir; + + public AtlasLoadLogger(String sourceDbName, String targetDbName, String dumpDir) { + this.sourceDbName = sourceDbName; + this.targetDbName = targetDbName; + this.dumpDir = dumpDir; + } + + @Override + public void startLog() { + new AtlasLoadBegin(sourceDbName, targetDbName).log(LogTag.ATLAS_LOAD_START); + } + + @Override + public void endLog(Integer count) { + new AtlasLoadEnd(sourceDbName, targetDbName, count, dumpDir).log(LogTag.ATLAS_LOAD_END); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java new file mode 100644 index 0000000000..a29881d56f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.log.state; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.hadoop.hive.ql.parse.repl.ReplState; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Replication state for Atlas Load Begin. + **/ +public class AtlasLoadBegin extends ReplState { + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private String sourceDbName; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private String targetDbName; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private Long loadStartTime; + + public AtlasLoadBegin(String sourceDbName, String targetDbName) { + this.sourceDbName = sourceDbName; + this.targetDbName = targetDbName; + this.loadStartTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java new file mode 100644 index 0000000000..c234af4247 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.log.state; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.hadoop.hive.ql.parse.repl.ReplState; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Replication state for Atlas Load End. + **/ +public class AtlasLoadEnd extends ReplState { + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private String sourceDbName; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private String targetDbName; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private Long numOfEntities; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private Long loadEndTime; + + @SuppressFBWarnings("URF_UNREAD_FIELD") + @JsonProperty + private String dumpDir; + + public AtlasLoadEnd(String sourceDbName, + String targetDbName, + long numOfEntities, + String dumpDir) { + this.sourceDbName = sourceDbName; + this.targetDbName = targetDbName; + this.numOfEntities = numOfEntities; + this.loadEndTime = System.currentTimeMillis() / 1000; + this.dumpDir = dumpDir; + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java new file mode 100644 index 0000000000..dee332f767 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder; +import org.apache.hadoop.hive.ql.parse.repl.ReplState; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.mockito.ArgumentMatchers.any; + +/** + * Unit test class for testing Atlas metadata Dump. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({LoggerFactory.class}) +public class TestAtlasDumpTask { + + @Mock + private AtlasDumpTask atlasDumpTask; + + @Mock + private HiveConf conf; + + @Test + public void testAtlasDumpMetrics() throws Exception { + AtlasReplInfo atlasReplInfo = new AtlasReplInfo("http://localhost:21000/atlas", "srcDB", + "tgtDb", "srcCluster", "tgtCluster", new Path("hdfs://tmp"), conf); + atlasReplInfo.setSrcFsUri("hdfs://srcFsUri:8020"); + atlasReplInfo.setTgtFsUri("hdfs:tgtFsUri:8020"); + Mockito.when(atlasDumpTask.createAtlasReplInfo()).thenReturn(atlasReplInfo); + Mockito.when(atlasDumpTask.lastStoredTimeStamp()).thenReturn(0L); + Mockito.when(atlasDumpTask.dumpAtlasMetaData(any(AtlasRequestBuilder.class), any(AtlasReplInfo.class))) + .thenReturn(0L); + Mockito.when(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)).thenReturn(true); + Logger logger = Mockito.mock(Logger.class); + Whitebox.setInternalState(ReplState.class, logger); + Mockito.when(atlasDumpTask.execute()).thenCallRealMethod(); + int status = atlasDumpTask.execute(); + Assert.assertEquals(0, status); + ArgumentCaptor replStateCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor eventDetailsCaptor = ArgumentCaptor.forClass(Object.class); + Mockito.verify(logger, + Mockito.times(2)).info(replStateCaptor.capture(), + eventCaptor.capture(), eventDetailsCaptor.capture()); + Assert.assertEquals("REPL::{}: {}", replStateCaptor.getAllValues().get(0)); + Assert.assertEquals("ATLAS_DUMP_START", eventCaptor.getAllValues().get(0)); + Assert.assertEquals("ATLAS_DUMP_END", eventCaptor.getAllValues().get(1)); + Assert.assertTrue(eventDetailsCaptor.getAllValues().get(1).toString(), eventDetailsCaptor.getAllValues().get(0) + .toString().contains("{\"dbName\":\"srcDB\",\"dumpStartTime")); + Assert.assertTrue(eventDetailsCaptor + .getAllValues().get(1).toString().contains("{\"dbName\":\"srcDB\",\"dumpEndTime\"")); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java new file mode 100644 index 0000000000..bb5fe0b352 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; +import org.apache.hadoop.hive.ql.parse.repl.ReplState; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.powermock.reflect.Whitebox; +import org.slf4j.Logger; + +/** + * Unit test class for testing Atlas metadata load. + */ +@RunWith(MockitoJUnitRunner.class) +public class TestAtlasLoadTask { + @Mock + private AtlasLoadTask atlasLoadTask; + + @Mock + private HiveConf conf; + + @Test + public void testAtlasLoadMetrics() throws Exception { + AtlasReplInfo atlasReplInfo = new AtlasReplInfo("http://localhost:21000/atlas", "srcDB", + "tgtDB", "srcCluster", "tgtCluster", new Path("hdfs://tmp"), conf); + atlasReplInfo.setSrcFsUri("hdfs://srcFsUri:8020"); + atlasReplInfo.setTgtFsUri("hdfs:tgtFsUri:8020"); + Mockito.when(atlasLoadTask.createAtlasReplInfo()).thenReturn(atlasReplInfo); + Mockito.when(atlasLoadTask.importAtlasMetadata(atlasReplInfo)).thenReturn(1); + Logger logger = Mockito.mock(Logger.class); + Whitebox.setInternalState(ReplState.class, logger); + Mockito.when(atlasLoadTask.execute()).thenCallRealMethod(); + int status = atlasLoadTask.execute(); + Assert.assertEquals(0, status); + ArgumentCaptor replStateCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor eventDetailsCaptor = ArgumentCaptor.forClass(Object.class); + Mockito.verify(logger, + Mockito.times(2)).info(replStateCaptor.capture(), + eventCaptor.capture(), eventDetailsCaptor.capture()); + Assert.assertEquals("REPL::{}: {}", replStateCaptor.getAllValues().get(0)); + Assert.assertEquals("ATLAS_LOAD_START", eventCaptor.getAllValues().get(0)); + Assert.assertEquals("ATLAS_LOAD_END", eventCaptor.getAllValues().get(1)); + Assert.assertTrue(eventDetailsCaptor.getAllValues().get(0) + .toString().contains("{\"sourceDbName\":\"srcDB\",\"targetDbName\":\"tgtDB\",\"loadStartTime\":")); + Assert.assertTrue(eventDetailsCaptor + .getAllValues().get(1).toString().contains("{\"sourceDbName\":\"srcDB\",\"targetDbName\"" + + ":\"tgtDB\",\"numOfEntities\":1,\"loadEndTime\"")); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java index 0559d1ba88..af41e3d773 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java @@ -44,7 +44,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_REST_URL; /** - * Unit test class for testing Ranger Dump. + * Unit test class for testing Ranger Load. */ @RunWith(MockitoJUnitRunner.class) public class TestRangerLoadTask {