diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java index c7be572..37127ea 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.*; +import com.google.common.collect.Sets; import org.apache.commons.cli.*; import org.apache.commons.io.FilenameUtils; import org.apache.hive.ptest.api.server.TestLogger; @@ -94,7 +95,12 @@ public JIRAService(Logger logger, TestConfiguration configuration, String buildT } void postComment(boolean error, int numTestsExecuted, SortedSet failedTests, - List messages) { + List messages) { + postComment(error, numTestsExecuted, failedTests, messages, new HashSet()); + } + + void postComment(boolean error, int numTestsExecuted, SortedSet failedTests, + List messages, Set addedTests) { DefaultHttpClient httpClient = new DefaultHttpClient(); try { BuildInfo buildInfo = formatBuildTag(mBuildTag); @@ -102,9 +108,9 @@ void postComment(boolean error, int numTestsExecuted, SortedSet failedTe List comments = Lists.newArrayList(); comments.add(""); comments.add(""); - if(!failedTests.isEmpty()) { + if (!failedTests.isEmpty()) { comments.add("{color:red}Overall{color}: -1 at least one tests failed"); - } else if(numTestsExecuted == 0) { + } else if (numTestsExecuted == 0) { comments.add("{color:red}Overall{color}: -1 no tests executed"); } else if (error) { comments.add("{color:red}Overall{color}: -1 build exited with an error"); @@ -112,17 +118,23 @@ void postComment(boolean error, int numTestsExecuted, SortedSet failedTe comments.add("{color:green}Overall{color}: +1 all checks pass"); } comments.add(""); - if(!mPatch.isEmpty()) { + if (!mPatch.isEmpty()) { comments.add("Here are the results of testing the latest attachment:"); comments.add(mPatch); } comments.add(""); - if(numTestsExecuted > 0) { + if (addedTests.size() > 0) { + comments.add(formatSuccess("+1 due to " + addedTests.size() + " test(s) being added or modified.")); + } else { + comments.add(formatError("-1 due to no test(s) being added or modified.")); + } + comments.add(""); + if (numTestsExecuted > 0) { if (failedTests.isEmpty()) { - comments.add(formatSuccess("+1 "+ numTestsExecuted + " tests passed")); + comments.add(formatSuccess("+1 " + numTestsExecuted + " tests passed")); } else { comments.add(formatError("-1 due to " + failedTests.size() - + " failed/errored test(s), " + numTestsExecuted + " tests executed")); + + " failed/errored test(s), " + numTestsExecuted + " tests executed")); comments.add("*Failed tests:*"); comments.add("{noformat}"); comments.addAll(failedTests); @@ -131,12 +143,12 @@ void postComment(boolean error, int numTestsExecuted, SortedSet failedTe comments.add(""); } comments.add("Test results: " + mJenkinsURL + "/" + - buildInfo.getFormattedBuildTag() + "/testReport"); + buildInfo.getFormattedBuildTag() + "/testReport"); comments.add("Console output: " + mJenkinsURL + "/" + - buildInfo.getFormattedBuildTag() + "/console"); + buildInfo.getFormattedBuildTag() + "/console"); comments.add("Test logs: " + mLogsURL + buildTagForLogs); comments.add(""); - if(!messages.isEmpty()) { + if (!messages.isEmpty()) { comments.add("Messages:"); comments.add("{noformat}"); comments.addAll(trimMessages(messages)); @@ -147,16 +159,16 @@ void postComment(boolean error, int numTestsExecuted, SortedSet failedTe String attachmentId = parseAttachementId(mPatch); comments.add(""); comments.add("ATTACHMENT ID: " + attachmentId + - " - " + buildInfo.getBuildName()); + " - " + buildInfo.getBuildName()); mLogger.info("Comment: " + Joiner.on("\n").join(comments)); String body = Joiner.on("\n").join(comments); String url = String.format("%s/rest/api/2/issue/%s/comment", mUrl, mName); URL apiURL = new URL(mUrl); httpClient.getCredentialsProvider() - .setCredentials( + .setCredentials( new AuthScope(apiURL.getHost(), apiURL.getPort(), - AuthScope.ANY_REALM), - new UsernamePasswordCredentials(mUser, mPassword)); + AuthScope.ANY_REALM), + new UsernamePasswordCredentials(mUser, mPassword)); BasicHttpContext localcontext = new BasicHttpContext(); localcontext.setAttribute("preemptive-auth", new BasicScheme()); httpClient.addRequestInterceptor(new PreemptiveAuth(), 0); @@ -169,36 +181,42 @@ void postComment(boolean error, int numTestsExecuted, SortedSet failedTe StatusLine statusLine = httpResponse.getStatusLine(); if (statusLine.getStatusCode() != 201) { throw new RuntimeException(statusLine.getStatusCode() + " " - + statusLine.getReasonPhrase()); + + statusLine.getReasonPhrase()); } mLogger.info("JIRA Response Metadata: " + httpResponse); } catch (Exception e) { mLogger.error("Encountered error attempting to post comment to " + mName, - e); + e); } finally { httpClient.getConnectionManager().shutdown(); } } + static List trimMessages(List messages) { int size = messages.size(); - if(size > MAX_MESSAGES) { + if (size > MAX_MESSAGES) { messages = messages.subList(size - MAX_MESSAGES, size); messages.add(0, TRIMMED_MESSAGE); } return messages; } + @SuppressWarnings("unused") private static class Body { private String body; + public Body() { } + public Body(String body) { this.body = body; } + public String getBody() { return body; } + public void setBody(String body) { this.body = body; } @@ -209,7 +227,7 @@ public void setBody(String body) { private String buildName; private String formattedBuildTag; - public BuildInfo (String buildName, String formattedBuildTag) { + public BuildInfo(String buildName, String formattedBuildTag) { this.buildName = buildName; this.formattedBuildTag = formattedBuildTag; } @@ -228,7 +246,7 @@ public String getFormattedBuildTag() { */ @VisibleForTesting static BuildInfo formatBuildTag(String buildTag) { - if(buildTag.contains("-")) { + if (buildTag.contains("-")) { int lastDashIndex = buildTag.lastIndexOf("-"); String buildName = buildTag.substring(0, lastDashIndex); String buildId = buildTag.substring(lastDashIndex + 1); @@ -237,6 +255,7 @@ static BuildInfo formatBuildTag(String buildTag) { } throw new IllegalArgumentException("Build tag '" + buildTag + "' must contain a -"); } + static String formatBuildTagForLogs(String buildTag) { if (buildTag.endsWith("/")) { return buildTag; @@ -244,6 +263,7 @@ static String formatBuildTagForLogs(String buildTag) { return buildTag + "/"; } } + private static String formatError(String msg) { return String.format("{color:red}ERROR:{color} %s", msg); } @@ -255,7 +275,7 @@ private static String formatSuccess(String msg) { static class PreemptiveAuth implements HttpRequestInterceptor { public void process(final HttpRequest request, final HttpContext context) - throws HttpException, IOException { + throws HttpException, IOException { AuthState authState = (AuthState) context.getAttribute(ClientContext.TARGET_AUTH_STATE); if (authState.getAuthScheme() == null) { AuthScheme authScheme = (AuthScheme) context.getAttribute("preemptive-auth"); @@ -263,34 +283,35 @@ public void process(final HttpRequest request, final HttpContext context) HttpHost targetHost = (HttpHost) context.getAttribute(ExecutionContext.HTTP_TARGET_HOST); if (authScheme != null) { Credentials creds = credsProvider.getCredentials(new AuthScope( - targetHost.getHostName(), targetHost.getPort())); + targetHost.getHostName(), targetHost.getPort())); if (creds == null) { throw new HttpException( - "No credentials for preemptive authentication"); + "No credentials for preemptive authentication"); } authState.update(authScheme, creds); } } } } + private static String parseAttachementId(String patch) { - if(patch == null) { + if (patch == null) { return ""; } String result = FilenameUtils.getPathNoEndSeparator(patch.trim()); - if(result == null) { + if (result == null) { return ""; } result = FilenameUtils.getName(result.trim()); - if(result == null) { + if (result == null) { return ""; } return result.trim(); } private static void assertRequired(CommandLine commandLine, String[] requiredOptions) throws IllegalArgumentException { - for(String requiredOption : requiredOptions) { - if(!commandLine.hasOption(requiredOption)) { + for (String requiredOption : requiredOptions) { + if (!commandLine.hasOption(requiredOption)) { throw new IllegalArgumentException("--" + requiredOption + " is required"); } } @@ -311,7 +332,7 @@ private static void assertRequired(CommandLine commandLine, String[] requiredOpt private static final String FIELD_FAILED_TESTS = "failedTests"; private static final String FIELD_MESSAGES = "messages"; private static final String FIELD_JIRA_USER = "jiraUser"; - private static final String FIELD_JIRA_PASS= "jiraPassword"; + private static final String FIELD_JIRA_PASS = "jiraPassword"; private static Map supportedJsonFields = new HashMap() { { @@ -387,9 +408,9 @@ private static CommandLine parseCommandLine(String[] args) throws ParseException } assertRequired(cmd, new String[]{ - OPT_USER_LONG, - OPT_PASS_LONG, - OPT_FILE_LONG + OPT_USER_LONG, + OPT_PASS_LONG, + OPT_FILE_LONG }); return cmd; @@ -400,7 +421,7 @@ public static void main(String[] args) throws Exception { try { cmd = parseCommandLine(args); - } catch(ParseException e) { + } catch (ParseException e) { System.out.println("Error parsing command arguments: " + e.getMessage()); System.exit(1); } @@ -413,25 +434,25 @@ public static void main(String[] args) throws Exception { Map jsonValues = parseJsonFile(cmd.getOptionValue(OPT_FILE_LONG)); Map context = Maps.newHashMap(); - context.put(FIELD_JIRA_URL, (String)jsonValues.get(FIELD_JIRA_URL)); + context.put(FIELD_JIRA_URL, (String) jsonValues.get(FIELD_JIRA_URL)); context.put(FIELD_JIRA_USER, cmd.getOptionValue(OPT_USER_LONG)); context.put(FIELD_JIRA_PASS, cmd.getOptionValue(OPT_PASS_LONG)); - context.put(FIELD_LOGS_URL, (String)jsonValues.get(FIELD_LOGS_URL)); - context.put(FIELD_REPO, (String)jsonValues.get(FIELD_REPO)); - context.put(FIELD_REPO_NAME, (String)jsonValues.get(FIELD_REPO_NAME)); - context.put(FIELD_REPO_TYPE, (String)jsonValues.get(FIELD_REPO_TYPE)); - context.put(FIELD_REPO_BRANCH, (String)jsonValues.get(FIELD_REPO_BRANCH)); - context.put(FIELD_JENKINS_URL, (String)jsonValues.get(FIELD_JENKINS_URL)); + context.put(FIELD_LOGS_URL, (String) jsonValues.get(FIELD_LOGS_URL)); + context.put(FIELD_REPO, (String) jsonValues.get(FIELD_REPO)); + context.put(FIELD_REPO_NAME, (String) jsonValues.get(FIELD_REPO_NAME)); + context.put(FIELD_REPO_TYPE, (String) jsonValues.get(FIELD_REPO_TYPE)); + context.put(FIELD_REPO_BRANCH, (String) jsonValues.get(FIELD_REPO_BRANCH)); + context.put(FIELD_JENKINS_URL, (String) jsonValues.get(FIELD_JENKINS_URL)); TestLogger logger = new TestLogger(System.err, TestLogger.LEVEL.TRACE); TestConfiguration configuration = new TestConfiguration(new Context(context), logger); - configuration.setJiraName((String)jsonValues.get(FIELD_JIRA_NAME)); - configuration.setPatch((String)jsonValues.get(FIELD_PATCH_URL)); - - JIRAService service = new JIRAService(logger, configuration, (String)jsonValues.get(FIELD_BUILD_TAG)); - List messages = (List)jsonValues.get(FIELD_MESSAGES); - SortedSet failedTests = (SortedSet)jsonValues.get(FIELD_FAILED_TESTS); - boolean error = (Integer)jsonValues.get(FIELD_BUILD_STATUS) == 0 ? false : true; - service.postComment(error, (Integer)jsonValues.get(FIELD_NUM_TESTS_EXECUTED), failedTests, messages); + configuration.setJiraName((String) jsonValues.get(FIELD_JIRA_NAME)); + configuration.setPatch((String) jsonValues.get(FIELD_PATCH_URL)); + + JIRAService service = new JIRAService(logger, configuration, (String) jsonValues.get(FIELD_BUILD_TAG)); + List messages = (List) jsonValues.get(FIELD_MESSAGES); + SortedSet failedTests = (SortedSet) jsonValues.get(FIELD_FAILED_TESTS); + boolean error = (Integer) jsonValues.get(FIELD_BUILD_STATUS) == 0 ? false : true; + service.postComment(error, (Integer) jsonValues.get(FIELD_NUM_TESTS_EXECUTED), failedTests, messages); } } \ No newline at end of file diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java index 7217ef9..35cc752 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,6 +73,7 @@ private final TestConfiguration mConfiguration; private final ListeningExecutorService mExecutor; + private final Set mAddedTests; private final Set mExecutedTests; private final Set mFailedTests; private final List mPhases; @@ -92,6 +94,7 @@ public PTest(final TestConfiguration configuration, final ExecutionContext execu mBuildTag = buildTag; mExecutedTests = Collections.newSetFromMap(new ConcurrentHashMap()); mFailedTests = Collections.newSetFromMap(new ConcurrentHashMap()); + mAddedTests = new HashSet(); mExecutionContext = executionContext; mSshCommandExecutor = sshCommandExecutor; mRsyncCommandExecutor = rsyncCommandExecutor; @@ -148,6 +151,7 @@ public HostExecutor build(Host host) { } mHostExecutors = new CopyOnWriteArrayList(hostExecutors); mPhases = Lists.newArrayList(); + mPhases.add(new TestCheckPhase(mHostExecutors, localCommandFactory, templateDefaults, patchFile, logger, mAddedTests)); mPhases.add(new PrepPhase(mHostExecutors, localCommandFactory, templateDefaults, scratchDir, patchFile, logger)); mPhases.add(new ExecutionPhase(mHostExecutors, mExecutionContext, hostExecutorBuilder, localCommandFactory, templateDefaults, succeededLogDir, failedLogDir, testParser.parse(), mExecutedTests, mFailedTests, logger)); @@ -213,7 +217,7 @@ public int run() { for(Map.Entry entry : elapsedTimes.entrySet()) { mLogger.info(String.format("PERF: Phase %s took %d minutes", entry.getKey(), entry.getValue())); } - publishJiraComment(error, messages, failedTests); + publishJiraComment(error, messages, failedTests, mAddedTests); if(error || !mFailedTests.isEmpty()) { result = 1; } @@ -221,7 +225,7 @@ public int run() { return result; } - private void publishJiraComment(boolean error, List messages, SortedSet failedTests) { + private void publishJiraComment(boolean error, List messages, SortedSet failedTests, Set addedTests) { if(mConfiguration.getJiraName().isEmpty()) { mLogger.info("Skipping JIRA comment as name is empty."); return; @@ -238,8 +242,9 @@ private void publishJiraComment(boolean error, List messages, SortedSet< mLogger.info("Skipping JIRA comment as password is empty."); return; } + mLogger.info("Added tests: " + addedTests); JIRAService jira = new JIRAService(mLogger, mConfiguration, mBuildTag); - jira.postComment(error, mExecutedTests.size(), failedTests, messages); + jira.postComment(error, mExecutedTests.size(), failedTests, messages, addedTests); } public static class Builder { diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/TestCheckPhase.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/TestCheckPhase.java new file mode 100644 index 0000000..1107dcd --- /dev/null +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/TestCheckPhase.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.ptest.execution; + +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TestCheckPhase extends Phase { + private final File mPatchFile; + private Set modifiedTestFiles; + + private static final Pattern fileNameFromDiff = Pattern.compile("[/][^\\s]*"); + private static final Pattern javaTest = Pattern.compile("Test.*java"); + + public TestCheckPhase(List hostExecutors, + LocalCommandFactory localCommandFactory, + ImmutableMap templateDefaults, + File patchFile, Logger logger, Set modifiedTestFiles) { + super(hostExecutors, localCommandFactory, templateDefaults, logger); + this.mPatchFile = patchFile; + this.modifiedTestFiles = modifiedTestFiles; + } + @Override + public void execute() throws Exception { + if(mPatchFile != null) { + logger.info("Reading patchfile " + mPatchFile.getAbsolutePath()); + FileReader fr = null; + try { + fr = new FileReader(mPatchFile); + BufferedReader br = new BufferedReader(fr); + String line; + while ((line = br.readLine()) != null) { + if(line.startsWith("+++")) { + logger.info("Searching line : " + line); + Matcher fileNameMatcher = fileNameFromDiff.matcher(line); + if (fileNameMatcher.find()) { + String filePath = fileNameMatcher.group(0); + String fileName = filePath.substring(filePath.lastIndexOf("/")+1); + Matcher javaTestMatcher = javaTest.matcher(fileName); + if (javaTestMatcher.find() || fileName.endsWith(".q")) { + modifiedTestFiles.add(fileName); + } + } + } + } + } finally { + fr.close(); + } + } else { + logger.error("Patch file is null"); + } + } +} diff --git a/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestTestCheckPhase.java b/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestTestCheckPhase.java new file mode 100644 index 0000000..7183ee3 --- /dev/null +++ b/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestTestCheckPhase.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.ptest.execution; + +import org.approvaltests.Approvals; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.net.URL; +import java.util.HashSet; +import java.util.Set; + +public class TestTestCheckPhase extends AbstractTestPhase { + private TestCheckPhase phase; + + @Before + public void setup() throws Exception { + initialize(getClass().getSimpleName()); + createHostExecutor(); + } + @Test + public void testNoTests() throws Exception { + URL url = this.getClass().getResource("/HIVE-9377.1.patch"); + File patchFile = new File(url.getFile()); + Set addedTests = new HashSet(); + phase = new TestCheckPhase(hostExecutors, localCommandFactory, + templateDefaults, patchFile, logger, addedTests); + phase.execute(); + + Assert.assertEquals(addedTests.size(), 0); + } + + + @Test + public void testJavaTests() throws Exception { + URL url = this.getClass().getResource("/HIVE-10761.6.patch"); + File patchFile = new File(url.getFile()); + Set addedTests = new HashSet(); + phase = new TestCheckPhase(hostExecutors, localCommandFactory, + templateDefaults, patchFile, logger, addedTests); + phase.execute(); + + Assert.assertEquals(addedTests.size(), 3); + Assert.assertTrue(addedTests.contains("TestCodahaleMetrics.java")); + Assert.assertTrue(addedTests.contains("TestMetaStoreMetrics.java")); + Assert.assertTrue(addedTests.contains("TestLegacyMetrics.java")); + } + + @Test + public void testQTests() throws Exception { + URL url = this.getClass().getResource("/HIVE-11271.4.patch"); + File patchFile = new File(url.getFile()); + Set addedTests = new HashSet(); + phase = new TestCheckPhase(hostExecutors, localCommandFactory, + templateDefaults, patchFile, logger, addedTests); + phase.execute(); + + Assert.assertEquals(addedTests.size(), 1); + Assert.assertTrue(addedTests.contains("unionall_unbalancedppd.q")); + } + + @Test + public void testRemoveTest() throws Exception { + URL url = this.getClass().getResource("/remove-test.patch"); + File patchFile = new File(url.getFile()); + Set addedTests = new HashSet(); + phase = new TestCheckPhase(hostExecutors, localCommandFactory, + templateDefaults, patchFile, logger, addedTests); + phase.execute(); + + Assert.assertEquals(addedTests.size(), 0); + } +} diff --git a/testutils/ptest2/src/test/resources/HIVE-10761.6.patch b/testutils/ptest2/src/test/resources/HIVE-10761.6.patch new file mode 100644 index 0000000..5b41850 --- /dev/null +++ b/testutils/ptest2/src/test/resources/HIVE-10761.6.patch @@ -0,0 +1,2539 @@ +diff --git a/common/pom.xml b/common/pom.xml +index a615c1e..8d4b1ea 100644 +--- a/common/pom.xml ++++ b/common/pom.xml +@@ -98,6 +98,26 @@ + json + ${json.version} + ++ ++ io.dropwizard.metrics ++ metrics-core ++ ${dropwizard.version} ++ ++ ++ io.dropwizard.metrics ++ metrics-jvm ++ ${dropwizard.version} ++ ++ ++ io.dropwizard.metrics ++ metrics-json ++ ${dropwizard.version} ++ ++ ++ com.fasterxml.jackson.core ++ jackson-databind ++ ${jackson.new.version} ++ + + + +diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java +new file mode 100644 +index 0000000..c3949f2 +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java +@@ -0,0 +1,225 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common; ++ ++import com.google.common.base.Joiner; ++import com.google.common.base.Preconditions; ++import com.google.common.base.Stopwatch; ++import com.google.common.collect.Lists; ++import com.google.common.collect.Maps; ++import com.google.common.collect.Sets; ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; ++import org.apache.hadoop.util.Daemon; ++ ++import java.lang.management.GarbageCollectorMXBean; ++import java.lang.management.ManagementFactory; ++import java.util.List; ++import java.util.Map; ++import java.util.Set; ++ ++/** ++ * Based on the JvmPauseMonitor from Hadoop. ++ */ ++public class JvmPauseMonitor { ++ private static final Log LOG = LogFactory.getLog( ++ JvmPauseMonitor.class); ++ ++ /** The target sleep time */ ++ private static final long SLEEP_INTERVAL_MS = 500; ++ ++ /** log WARN if we detect a pause longer than this threshold */ ++ private final long warnThresholdMs; ++ private static final String WARN_THRESHOLD_KEY = ++ "jvm.pause.warn-threshold.ms"; ++ private static final long WARN_THRESHOLD_DEFAULT = 10000; ++ ++ /** log INFO if we detect a pause longer than this threshold */ ++ private final long infoThresholdMs; ++ private static final String INFO_THRESHOLD_KEY = ++ "jvm.pause.info-threshold.ms"; ++ private static final long INFO_THRESHOLD_DEFAULT = 1000; ++ ++ private long numGcWarnThresholdExceeded = 0; ++ private long numGcInfoThresholdExceeded = 0; ++ private long totalGcExtraSleepTime = 0; ++ ++ private Thread monitorThread; ++ private volatile boolean shouldRun = true; ++ ++ public JvmPauseMonitor(Configuration conf) { ++ this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); ++ this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); ++ } ++ ++ public void start() { ++ Preconditions.checkState(monitorThread == null, ++ "JvmPauseMonitor thread is Already started"); ++ monitorThread = new Daemon(new Monitor()); ++ monitorThread.start(); ++ } ++ ++ public void stop() { ++ shouldRun = false; ++ if (isStarted()) { ++ monitorThread.interrupt(); ++ try { ++ monitorThread.join(); ++ } catch (InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ } ++ } ++ } ++ ++ public boolean isStarted() { ++ return monitorThread != null; ++ } ++ ++ public long getNumGcWarnThreadholdExceeded() { ++ return numGcWarnThresholdExceeded; ++ } ++ ++ public long getNumGcInfoThresholdExceeded() { ++ return numGcInfoThresholdExceeded; ++ } ++ ++ public long getTotalGcExtraSleepTime() { ++ return totalGcExtraSleepTime; ++ } ++ ++ private String formatMessage(long extraSleepTime, ++ Map gcTimesAfterSleep, ++ Map gcTimesBeforeSleep) { ++ ++ Set gcBeanNames = Sets.intersection( ++ gcTimesAfterSleep.keySet(), ++ gcTimesBeforeSleep.keySet()); ++ List gcDiffs = Lists.newArrayList(); ++ for (String name : gcBeanNames) { ++ GcTimes diff = gcTimesAfterSleep.get(name).subtract( ++ gcTimesBeforeSleep.get(name)); ++ if (diff.gcCount != 0) { ++ gcDiffs.add("GC pool '" + name + "' had collection(s): " + ++ diff.toString()); ++ } ++ } ++ ++ String ret = "Detected pause in JVM or host machine (eg GC): " + ++ "pause of approximately " + extraSleepTime + "ms\n"; ++ if (gcDiffs.isEmpty()) { ++ ret += "No GCs detected"; ++ } else { ++ ret += Joiner.on("\n").join(gcDiffs); ++ } ++ return ret; ++ } ++ ++ private Map getGcTimes() { ++ Map map = Maps.newHashMap(); ++ List gcBeans = ++ ManagementFactory.getGarbageCollectorMXBeans(); ++ for (GarbageCollectorMXBean gcBean : gcBeans) { ++ map.put(gcBean.getName(), new GcTimes(gcBean)); ++ } ++ return map; ++ } ++ ++ private static class GcTimes { ++ private GcTimes(GarbageCollectorMXBean gcBean) { ++ gcCount = gcBean.getCollectionCount(); ++ gcTimeMillis = gcBean.getCollectionTime(); ++ } ++ ++ private GcTimes(long count, long time) { ++ this.gcCount = count; ++ this.gcTimeMillis = time; ++ } ++ ++ private GcTimes subtract(GcTimes other) { ++ return new GcTimes(this.gcCount - other.gcCount, ++ this.gcTimeMillis - other.gcTimeMillis); ++ } ++ ++ @Override ++ public String toString() { ++ return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; ++ } ++ ++ private long gcCount; ++ private long gcTimeMillis; ++ } ++ ++ private class Monitor implements Runnable { ++ @Override ++ public void run() { ++ Stopwatch sw = new Stopwatch(); ++ Map gcTimesBeforeSleep = getGcTimes(); ++ while (shouldRun) { ++ sw.reset().start(); ++ try { ++ Thread.sleep(SLEEP_INTERVAL_MS); ++ } catch (InterruptedException ie) { ++ return; ++ } ++ long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS; ++ Map gcTimesAfterSleep = getGcTimes(); ++ ++ if (extraSleepTime > warnThresholdMs) { ++ ++numGcWarnThresholdExceeded; ++ LOG.warn(formatMessage( ++ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); ++ incrementMetricsCounter("jvm.pause.warn-threshold", 1); ++ } else if (extraSleepTime > infoThresholdMs) { ++ ++numGcInfoThresholdExceeded; ++ LOG.info(formatMessage( ++ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); ++ incrementMetricsCounter("jvm.pause.info-threshold", 1); ++ } ++ incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime); ++ totalGcExtraSleepTime += extraSleepTime; ++ gcTimesBeforeSleep = gcTimesAfterSleep; ++ } ++ } ++ ++ private void incrementMetricsCounter(String name, long count) { ++ try { ++ MetricsFactory.getMetricsInstance().incrementCounter(name, count); ++ } catch (Exception e) { ++ LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e); ++ } ++ } ++ } ++ ++ /** ++ * Simple 'main' to facilitate manual testing of the pause monitor. ++ * ++ * This main function just leaks memory into a list. Running this class ++ * with a 1GB heap will very quickly go into "GC hell" and result in ++ * log messages about the GC pauses. ++ */ ++ public static void main(String []args) throws Exception { ++ new JvmPauseMonitor(new Configuration()).start(); ++ List list = Lists.newArrayList(); ++ int i = 0; ++ while (true) { ++ list.add(String.valueOf(i++)); ++ } ++ } ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java +new file mode 100644 +index 0000000..14f7afb +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java +@@ -0,0 +1,262 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics; ++ ++import org.apache.hadoop.hive.common.metrics.common.Metrics; ++import org.apache.hadoop.hive.conf.HiveConf; ++ ++import java.io.IOException; ++import java.lang.management.ManagementFactory; ++import java.util.HashMap; ++ ++import javax.management.MBeanServer; ++import javax.management.MalformedObjectNameException; ++import javax.management.ObjectName; ++ ++/** ++ * This class may eventually get superseded by org.apache.hadoop.hive.common.metrics2.Metrics. ++ * ++ * Metrics Subsystem - allows exposure of a number of named parameters/counters ++ * via jmx, intended to be used as a static subsystem ++ * ++ * Has a couple of primary ways it can be used: ++ * (i) Using the set and get methods to set and get named parameters ++ * (ii) Using the incrementCounter method to increment and set named ++ * parameters in one go, rather than having to make a get and then a set. ++ * (iii) Using the startScope and endScope methods to start and end ++ * named "scopes" that record the number of times they've been ++ * instantiated and amount of time(in milliseconds) spent inside ++ * the scopes. ++ */ ++public class LegacyMetrics implements Metrics { ++ ++ private LegacyMetrics() { ++ // block ++ } ++ ++ /** ++ * MetricsScope : A class that encapsulates an idea of a metered scope. ++ * Instantiating a named scope and then closing it exposes two counters: ++ * (i) a "number of calls" counter ( <name>.n ), and ++ * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t) ++ */ ++ public static class MetricsScope { ++ ++ final LegacyMetrics metrics; ++ ++ final String name; ++ final String numCounter; ++ final String timeCounter; ++ final String avgTimeCounter; ++ ++ private boolean isOpen = false; ++ private Long startTime = null; ++ ++ /** ++ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. ++ * @param name - name of the variable ++ * @throws IOException ++ */ ++ private MetricsScope(String name, LegacyMetrics metrics) throws IOException { ++ this.metrics = metrics; ++ this.name = name; ++ this.numCounter = name + ".n"; ++ this.timeCounter = name + ".t"; ++ this.avgTimeCounter = name + ".avg_t"; ++ open(); ++ } ++ ++ public Long getNumCounter() throws IOException { ++ return (Long) metrics.get(numCounter); ++ } ++ ++ public Long getTimeCounter() throws IOException { ++ return (Long) metrics.get(timeCounter); ++ } ++ ++ /** ++ * Opens scope, and makes note of the time started, increments run counter ++ * @throws IOException ++ * ++ */ ++ public void open() throws IOException { ++ if (!isOpen) { ++ isOpen = true; ++ startTime = System.currentTimeMillis(); ++ } else { ++ throw new IOException("Scope named " + name + " is not closed, cannot be opened."); ++ } ++ } ++ ++ /** ++ * Closes scope, and records the time taken ++ * @throws IOException ++ */ ++ public void close() throws IOException { ++ if (isOpen) { ++ Long endTime = System.currentTimeMillis(); ++ synchronized(metrics) { ++ Long num = metrics.incrementCounter(numCounter); ++ Long time = metrics.incrementCounter(timeCounter, endTime - startTime); ++ if (num != null && time != null) { ++ metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue())); ++ } ++ } ++ } else { ++ throw new IOException("Scope named " + name + " is not open, cannot be closed."); ++ } ++ isOpen = false; ++ } ++ ++ ++ /** ++ * Closes scope if open, and reopens it ++ * @throws IOException ++ */ ++ public void reopen() throws IOException { ++ if(isOpen) { ++ close(); ++ } ++ open(); ++ } ++ ++ } ++ ++ private static final MetricsMBean metrics = new MetricsMBeanImpl(); ++ ++ private static final ObjectName oname; ++ static { ++ try { ++ oname = new ObjectName( ++ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); ++ } catch (MalformedObjectNameException mone) { ++ throw new RuntimeException(mone); ++ } ++ } ++ ++ ++ private static final ThreadLocal> threadLocalScopes ++ = new ThreadLocal>() { ++ @Override ++ protected HashMap initialValue() { ++ return new HashMap(); ++ } ++ }; ++ ++ private boolean initialized = false; ++ ++ public void init(HiveConf conf) throws Exception { ++ if (!initialized) { ++ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ++ mbs.registerMBean(metrics, oname); ++ initialized = true; ++ } ++ } ++ ++ public boolean isInitialized() { ++ return initialized; ++ } ++ ++ public Long incrementCounter(String name) throws IOException{ ++ if (!initialized) { ++ return null; ++ } ++ return incrementCounter(name,Long.valueOf(1)); ++ } ++ ++ public Long incrementCounter(String name, long increment) throws IOException{ ++ if (!initialized) { ++ return null; ++ } ++ Long value; ++ synchronized(metrics) { ++ if (!metrics.hasKey(name)) { ++ value = Long.valueOf(increment); ++ set(name, value); ++ } else { ++ value = ((Long)get(name)) + increment; ++ set(name, value); ++ } ++ } ++ return value; ++ } ++ ++ public void set(String name, Object value) throws IOException{ ++ if (!initialized) { ++ return; ++ } ++ metrics.put(name,value); ++ } ++ ++ public Object get(String name) throws IOException{ ++ if (!initialized) { ++ return null; ++ } ++ return metrics.get(name); ++ } ++ ++ public void startScope(String name) throws IOException{ ++ if (!initialized) { ++ return; ++ } ++ if (threadLocalScopes.get().containsKey(name)) { ++ threadLocalScopes.get().get(name).open(); ++ } else { ++ threadLocalScopes.get().put(name, new MetricsScope(name, this)); ++ } ++ } ++ ++ public MetricsScope getScope(String name) throws IOException { ++ if (!initialized) { ++ return null; ++ } ++ if (threadLocalScopes.get().containsKey(name)) { ++ return threadLocalScopes.get().get(name); ++ } else { ++ throw new IOException("No metrics scope named " + name); ++ } ++ } ++ ++ public void endScope(String name) throws IOException{ ++ if (!initialized) { ++ return; ++ } ++ if (threadLocalScopes.get().containsKey(name)) { ++ threadLocalScopes.get().get(name).close(); ++ } ++ } ++ ++ /** ++ * Resets the static context state to initial. ++ * Used primarily for testing purposes. ++ * ++ * Note that threadLocalScopes ThreadLocal is *not* cleared in this call. ++ */ ++ public void deInit() throws Exception { ++ synchronized (metrics) { ++ if (initialized) { ++ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ++ if (mbs.isRegistered(oname)) { ++ mbs.unregisterMBean(oname); ++ } ++ metrics.clear(); ++ initialized = false; ++ } ++ } ++ } ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java +deleted file mode 100644 +index 01c9d1d..0000000 +--- a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java ++++ /dev/null +@@ -1,253 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.hadoop.hive.common.metrics; +- +-import java.io.IOException; +-import java.lang.management.ManagementFactory; +-import java.util.HashMap; +- +-import javax.management.MBeanServer; +-import javax.management.MalformedObjectNameException; +-import javax.management.ObjectName; +- +-/** +- * Metrics Subsystem - allows exposure of a number of named parameters/counters +- * via jmx, intended to be used as a static subsystem +- * +- * Has a couple of primary ways it can be used: +- * (i) Using the set and get methods to set and get named parameters +- * (ii) Using the incrementCounter method to increment and set named +- * parameters in one go, rather than having to make a get and then a set. +- * (iii) Using the startScope and endScope methods to start and end +- * named "scopes" that record the number of times they've been +- * instantiated and amount of time(in milliseconds) spent inside +- * the scopes. +- */ +-public class Metrics { +- +- private Metrics() { +- // block +- } +- +- /** +- * MetricsScope : A class that encapsulates an idea of a metered scope. +- * Instantiating a named scope and then closing it exposes two counters: +- * (i) a "number of calls" counter ( <name>.n ), and +- * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t) +- */ +- public static class MetricsScope { +- +- final String name; +- final String numCounter; +- final String timeCounter; +- final String avgTimeCounter; +- +- private boolean isOpen = false; +- private Long startTime = null; +- +- /** +- * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. +- * @param name - name of the variable +- * @throws IOException +- */ +- private MetricsScope(String name) throws IOException { +- this.name = name; +- this.numCounter = name + ".n"; +- this.timeCounter = name + ".t"; +- this.avgTimeCounter = name + ".avg_t"; +- open(); +- } +- +- public Long getNumCounter() throws IOException { +- return (Long)Metrics.get(numCounter); +- } +- +- public Long getTimeCounter() throws IOException { +- return (Long)Metrics.get(timeCounter); +- } +- +- /** +- * Opens scope, and makes note of the time started, increments run counter +- * @throws IOException +- * +- */ +- public void open() throws IOException { +- if (!isOpen) { +- isOpen = true; +- startTime = System.currentTimeMillis(); +- } else { +- throw new IOException("Scope named " + name + " is not closed, cannot be opened."); +- } +- } +- +- /** +- * Closes scope, and records the time taken +- * @throws IOException +- */ +- public void close() throws IOException { +- if (isOpen) { +- Long endTime = System.currentTimeMillis(); +- synchronized(metrics) { +- Long num = Metrics.incrementCounter(numCounter); +- Long time = Metrics.incrementCounter(timeCounter, endTime - startTime); +- if (num != null && time != null) { +- Metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue())); +- } +- } +- } else { +- throw new IOException("Scope named " + name + " is not open, cannot be closed."); +- } +- isOpen = false; +- } +- +- +- /** +- * Closes scope if open, and reopens it +- * @throws IOException +- */ +- public void reopen() throws IOException { +- if(isOpen) { +- close(); +- } +- open(); +- } +- +- } +- +- private static final MetricsMBean metrics = new MetricsMBeanImpl(); +- +- private static final ObjectName oname; +- static { +- try { +- oname = new ObjectName( +- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); +- } catch (MalformedObjectNameException mone) { +- throw new RuntimeException(mone); +- } +- } +- +- +- private static final ThreadLocal> threadLocalScopes +- = new ThreadLocal>() { +- @Override +- protected HashMap initialValue() { +- return new HashMap(); +- } +- }; +- +- private static boolean initialized = false; +- +- public static void init() throws Exception { +- synchronized (metrics) { +- if (!initialized) { +- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); +- mbs.registerMBean(metrics, oname); +- initialized = true; +- } +- } +- } +- +- public static Long incrementCounter(String name) throws IOException{ +- if (!initialized) { +- return null; +- } +- return incrementCounter(name,Long.valueOf(1)); +- } +- +- public static Long incrementCounter(String name, long increment) throws IOException{ +- if (!initialized) { +- return null; +- } +- Long value; +- synchronized(metrics) { +- if (!metrics.hasKey(name)) { +- value = Long.valueOf(increment); +- set(name, value); +- } else { +- value = ((Long)get(name)) + increment; +- set(name, value); +- } +- } +- return value; +- } +- +- public static void set(String name, Object value) throws IOException{ +- if (!initialized) { +- return; +- } +- metrics.put(name,value); +- } +- +- public static Object get(String name) throws IOException{ +- if (!initialized) { +- return null; +- } +- return metrics.get(name); +- } +- +- public static MetricsScope startScope(String name) throws IOException{ +- if (!initialized) { +- return null; +- } +- if (threadLocalScopes.get().containsKey(name)) { +- threadLocalScopes.get().get(name).open(); +- } else { +- threadLocalScopes.get().put(name, new MetricsScope(name)); +- } +- return threadLocalScopes.get().get(name); +- } +- +- public static MetricsScope getScope(String name) throws IOException { +- if (!initialized) { +- return null; +- } +- if (threadLocalScopes.get().containsKey(name)) { +- return threadLocalScopes.get().get(name); +- } else { +- throw new IOException("No metrics scope named " + name); +- } +- } +- +- public static void endScope(String name) throws IOException{ +- if (!initialized) { +- return; +- } +- if (threadLocalScopes.get().containsKey(name)) { +- threadLocalScopes.get().get(name).close(); +- } +- } +- +- /** +- * Resets the static context state to initial. +- * Used primarily for testing purposes. +- * +- * Note that threadLocalScopes ThreadLocal is *not* cleared in this call. +- */ +- static void uninit() throws Exception { +- synchronized (metrics) { +- if (initialized) { +- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); +- if (mbs.isRegistered(oname)) { +- mbs.unregisterMBean(oname); +- } +- metrics.clear(); +- initialized = false; +- } +- } +- } +-} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java +new file mode 100644 +index 0000000..13a5336 +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java +@@ -0,0 +1,68 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics.common; ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.hive.conf.HiveConf; ++ ++import java.io.IOException; ++ ++/** ++ * Generic Metics interface. ++ */ ++public interface Metrics { ++ ++ /** ++ * Initialize Metrics system with given Hive configuration. ++ * @param conf ++ */ ++ public void init(HiveConf conf) throws Exception; ++ ++ /** ++ * Deinitializes the Metrics system. ++ */ ++ public void deInit() throws Exception; ++ ++ /** ++ * @param name ++ * @throws IOException ++ */ ++ public void startScope(String name) throws IOException; ++ ++ public void endScope(String name) throws IOException; ++ ++ //Counter-related methods ++ ++ /** ++ * Increments a counter of the given name by 1. ++ * @param name ++ * @return ++ * @throws IOException ++ */ ++ public Long incrementCounter(String name) throws IOException; ++ ++ /** ++ * Increments a counter of the given name by "increment" ++ * @param name ++ * @param increment ++ * @return ++ * @throws IOException ++ */ ++ public Long incrementCounter(String name, long increment) throws IOException; ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java +new file mode 100644 +index 0000000..12a309d +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java +@@ -0,0 +1,48 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics.common; ++ ++import org.apache.hadoop.hive.conf.HiveConf; ++import org.apache.hadoop.util.ReflectionUtils; ++ ++/** ++ * Class that manages a static Metric instance for this process. ++ */ ++public class MetricsFactory { ++ ++ private static Metrics metrics; ++ private static Object initLock = new Object(); ++ ++ public synchronized static void init(HiveConf conf) throws Exception { ++ if (metrics == null) { ++ metrics = (Metrics) ReflectionUtils.newInstance(conf.getClassByName( ++ conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS)), conf); ++ } ++ metrics.init(conf); ++ } ++ ++ public synchronized static Metrics getMetricsInstance() { ++ return metrics; ++ } ++ ++ public synchronized static void deInit() throws Exception { ++ if (metrics != null) { ++ metrics.deInit(); ++ } ++ } ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java +new file mode 100644 +index 0000000..e59da99 +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java +@@ -0,0 +1,366 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.hive.common.metrics.metrics2; ++ ++import com.codahale.metrics.ConsoleReporter; ++import com.codahale.metrics.Counter; ++import com.codahale.metrics.ExponentiallyDecayingReservoir; ++import com.codahale.metrics.JmxReporter; ++import com.codahale.metrics.Metric; ++import com.codahale.metrics.MetricRegistry; ++import com.codahale.metrics.MetricSet; ++import com.codahale.metrics.Timer; ++import com.codahale.metrics.json.MetricsModule; ++import com.codahale.metrics.jvm.BufferPoolMetricSet; ++import com.codahale.metrics.jvm.ClassLoadingGaugeSet; ++import com.codahale.metrics.jvm.GarbageCollectorMetricSet; ++import com.codahale.metrics.jvm.MemoryUsageGaugeSet; ++import com.codahale.metrics.jvm.ThreadStatesGaugeSet; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import com.google.common.annotations.VisibleForTesting; ++import com.google.common.base.Splitter; ++import com.google.common.cache.CacheBuilder; ++import com.google.common.cache.CacheLoader; ++import com.google.common.cache.LoadingCache; ++import com.google.common.collect.Lists; ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.fs.permission.FsPermission; ++import org.apache.hadoop.hive.conf.HiveConf; ++ ++import java.io.BufferedReader; ++import java.io.BufferedWriter; ++import java.io.Closeable; ++import java.io.IOException; ++import java.io.OutputStreamWriter; ++import java.lang.management.ManagementFactory; ++import java.util.HashMap; ++import java.util.HashSet; ++import java.util.List; ++import java.util.Map; ++import java.util.Set; ++import java.util.TimerTask; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.locks.Lock; ++import java.util.concurrent.locks.ReentrantLock; ++ ++/** ++ * Codahale-backed Metrics implementation. ++ */ ++public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.common.Metrics { ++ public static final String API_PREFIX = "api_"; ++ public static final Log LOGGER = LogFactory.getLog(CodahaleMetrics.class); ++ ++ public final MetricRegistry metricRegistry = new MetricRegistry(); ++ private final Lock timersLock = new ReentrantLock(); ++ private final Lock countersLock = new ReentrantLock(); ++ ++ private LoadingCache timers; ++ private LoadingCache counters; ++ ++ private boolean initialized = false; ++ private HiveConf conf; ++ private final Set reporters = new HashSet(); ++ ++ private final ThreadLocal> threadLocalScopes ++ = new ThreadLocal>() { ++ @Override ++ protected HashMap initialValue() { ++ return new HashMap(); ++ } ++ }; ++ ++ public static class MetricsScope { ++ ++ final String name; ++ final Timer timer; ++ Timer.Context timerContext; ++ CodahaleMetrics metrics; ++ ++ private boolean isOpen = false; ++ ++ /** ++ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. ++ * @param name - name of the variable ++ * @throws IOException ++ */ ++ private MetricsScope(String name, CodahaleMetrics metrics) throws IOException { ++ this.name = name; ++ this.metrics = metrics; ++ this.timer = metrics.getTimer(name); ++ open(); ++ } ++ ++ /** ++ * Opens scope, and makes note of the time started, increments run counter ++ * @throws IOException ++ * ++ */ ++ public void open() throws IOException { ++ if (!isOpen) { ++ isOpen = true; ++ this.timerContext = timer.time(); ++ } else { ++ throw new IOException("Scope named " + name + " is not closed, cannot be opened."); ++ } ++ } ++ ++ /** ++ * Closes scope, and records the time taken ++ * @throws IOException ++ */ ++ public void close() throws IOException { ++ if (isOpen) { ++ timerContext.close(); ++ ++ } else { ++ throw new IOException("Scope named " + name + " is not open, cannot be closed."); ++ } ++ isOpen = false; ++ } ++ } ++ ++ public synchronized void init(HiveConf conf) throws Exception { ++ if (initialized) { ++ return; ++ } ++ ++ this.conf = conf; ++ //Codahale artifacts are lazily-created. ++ timers = CacheBuilder.newBuilder().build( ++ new CacheLoader() { ++ @Override ++ public com.codahale.metrics.Timer load(String key) throws Exception { ++ Timer timer = new Timer(new ExponentiallyDecayingReservoir()); ++ metricRegistry.register(key, timer); ++ return timer; ++ } ++ } ++ ); ++ counters = CacheBuilder.newBuilder().build( ++ new CacheLoader() { ++ @Override ++ public Counter load(String key) throws Exception { ++ Counter counter = new Counter(); ++ metricRegistry.register(key, counter); ++ return counter; ++ } ++ } ++ ); ++ ++ //register JVM metrics ++ registerAll("gc", new GarbageCollectorMetricSet()); ++ registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer())); ++ registerAll("memory", new MemoryUsageGaugeSet()); ++ registerAll("threads", new ThreadStatesGaugeSet()); ++ registerAll("classLoading", new ClassLoadingGaugeSet()); ++ ++ //Metrics reporter ++ Set finalReporterList = new HashSet(); ++ List metricsReporterNames = Lists.newArrayList( ++ Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER))); ++ ++ if(metricsReporterNames != null) { ++ for (String metricsReportingName : metricsReporterNames) { ++ try { ++ MetricsReporting reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase()); ++ finalReporterList.add(reporter); ++ } catch (IllegalArgumentException e) { ++ LOGGER.warn("Metrics reporter skipped due to invalid configured reporter: " + metricsReportingName); ++ } ++ } ++ } ++ initReporting(finalReporterList); ++ initialized = true; ++ } ++ ++ ++ public synchronized void deInit() throws Exception { ++ if (initialized) { ++ if (reporters != null) { ++ for (Closeable reporter : reporters) { ++ reporter.close(); ++ } ++ } ++ for (Map.Entry metric : metricRegistry.getMetrics().entrySet()) { ++ metricRegistry.remove(metric.getKey()); ++ } ++ timers.invalidateAll(); ++ counters.invalidateAll(); ++ initialized = false; ++ } ++ } ++ ++ public void startScope(String name) throws IOException { ++ synchronized (this) { ++ if (!initialized) { ++ return; ++ } ++ } ++ name = API_PREFIX + name; ++ if (threadLocalScopes.get().containsKey(name)) { ++ threadLocalScopes.get().get(name).open(); ++ } else { ++ threadLocalScopes.get().put(name, new MetricsScope(name, this)); ++ } ++ } ++ ++ public void endScope(String name) throws IOException{ ++ synchronized (this) { ++ if (!initialized) { ++ return; ++ } ++ } ++ name = API_PREFIX + name; ++ if (threadLocalScopes.get().containsKey(name)) { ++ threadLocalScopes.get().get(name).close(); ++ } ++ } ++ ++ public Long incrementCounter(String name) throws IOException { ++ return incrementCounter(name, 1); ++ } ++ ++ public Long incrementCounter(String name, long increment) throws IOException { ++ String key = name; ++ try { ++ countersLock.lock(); ++ counters.get(key).inc(increment); ++ return counters.get(key).getCount(); ++ } catch(ExecutionException ee) { ++ throw new RuntimeException(ee); ++ } finally { ++ countersLock.unlock(); ++ } ++ } ++ ++ // This method is necessary to synchronize lazy-creation to the timers. ++ private Timer getTimer(String name) throws IOException { ++ String key = name; ++ try { ++ timersLock.lock(); ++ Timer timer = timers.get(key); ++ return timer; ++ } catch (ExecutionException e) { ++ throw new IOException(e); ++ } finally { ++ timersLock.unlock(); ++ } ++ } ++ ++ private void registerAll(String prefix, MetricSet metricSet) { ++ for (Map.Entry entry : metricSet.getMetrics().entrySet()) { ++ if (entry.getValue() instanceof MetricSet) { ++ registerAll(prefix + "." + entry.getKey(), (MetricSet) entry.getValue()); ++ } else { ++ metricRegistry.register(prefix + "." + entry.getKey(), entry.getValue()); ++ } ++ } ++ } ++ ++ @VisibleForTesting ++ public MetricRegistry getMetricRegistry() { ++ return metricRegistry; ++ } ++ ++ /** ++ * Should be only called once to initialize the reporters ++ */ ++ private void initReporting(Set reportingSet) throws Exception { ++ for (MetricsReporting reporting : reportingSet) { ++ switch(reporting) { ++ case CONSOLE: ++ final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry) ++ .convertRatesTo(TimeUnit.SECONDS) ++ .convertDurationsTo(TimeUnit.MILLISECONDS) ++ .build(); ++ consoleReporter.start(1, TimeUnit.SECONDS); ++ reporters.add(consoleReporter); ++ break; ++ case JMX: ++ final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry) ++ .convertRatesTo(TimeUnit.SECONDS) ++ .convertDurationsTo(TimeUnit.MILLISECONDS) ++ .build(); ++ jmxReporter.start(); ++ reporters.add(jmxReporter); ++ break; ++ case JSON_FILE: ++ final JsonFileReporter jsonFileReporter = new JsonFileReporter(); ++ jsonFileReporter.start(); ++ reporters.add(jsonFileReporter); ++ break; ++ } ++ } ++ } ++ ++ class JsonFileReporter implements Closeable { ++ private ObjectMapper jsonMapper = null; ++ private java.util.Timer timer = null; ++ ++ public void start() { ++ this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false)); ++ this.timer = new java.util.Timer(true); ++ ++ long time = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS); ++ final String pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION); ++ ++ timer.schedule(new TimerTask() { ++ @Override ++ public void run() { ++ BufferedWriter bw = null; ++ try { ++ String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry); ++ Path tmpPath = new Path(pathString + ".tmp"); ++ FileSystem fs = FileSystem.get(conf); ++ fs.delete(tmpPath, true); ++ bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true))); ++ bw.write(json); ++ bw.close(); ++ ++ Path path = new Path(pathString); ++ fs.rename(tmpPath, path); ++ fs.setPermission(path, FsPermission.createImmutable((short) 0644)); ++ } catch (Exception e) { ++ LOGGER.warn("Error writing JSON Metrics to file", e); ++ } finally { ++ try { ++ if (bw != null) { ++ bw.close(); ++ } ++ } catch (IOException e) { ++ //Ignore. ++ } ++ } ++ ++ ++ } ++ }, 0, time); ++ } ++ ++ public void close() { ++ if (timer != null) { ++ this.timer.cancel(); ++ } ++ } ++ } ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java +new file mode 100644 +index 0000000..643246f +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java +@@ -0,0 +1,27 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics.metrics2; ++ ++/** ++ * Reporting options for org.apache.hadoop.hive.common.metrics.metrics2.Metrics. ++ */ ++public enum MetricsReporting { ++ JMX, ++ CONSOLE, ++ JSON_FILE ++} +diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +index 49b8f97..55a79a9 100644 +--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ++++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +@@ -645,6 +645,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + "Maximum cache full % after which the cache cleaner thread kicks in."), + METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL("hive.metastore.aggregate.stats.cache.clean.until", (float) 0.8, + "The cleaner thread cleans until cache reaches this % full size."), ++ METASTORE_METRICS("hive.metastore.metrics.enabled", false, "Enable metrics on the metastore."), + + // Parameters for exporting metadata on table drop (requires the use of the) + // org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener +@@ -1688,6 +1689,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + " EXECUTION: Log completion of tasks\n" + + " PERFORMANCE: Execution + Performance logs \n" + + " VERBOSE: All logs" ), ++ HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."), + // logging configuration + HIVE_LOG4J_FILE("hive.log4j.file", "", + "Hive log4j configuration file.\n" + +@@ -1715,7 +1717,21 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME( + "hive.autogen.columnalias.prefix.includefuncname", false, + "Whether to include function name in the column alias auto generated by Hive."), +- ++ HIVE_METRICS_CLASS("hive.service.metrics.class", ++ "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics", ++ new StringSet( ++ "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics", ++ "org.apache.hadoop.hive.common.metrics.LegacyMetrics"), ++ "Hive metrics subsystem implementation class."), ++ HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX", ++ "Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"), ++ HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/my-logging.properties", ++ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " + ++ "This file will get overwritten at every interval."), ++ HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s", ++ new TimeValidator(TimeUnit.MILLISECONDS), ++ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, " + ++ "the frequency of updating JSON metrics file."), + HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger", + "The class responsible for logging client side performance metrics. \n" + + "Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"), +diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java +new file mode 100644 +index 0000000..c14c7ee +--- /dev/null ++++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java +@@ -0,0 +1,295 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics; ++ ++import java.io.IOException; ++import java.lang.management.ManagementFactory; ++import java.util.concurrent.Callable; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.TimeUnit; ++ ++import javax.management.Attribute; ++import javax.management.MBeanAttributeInfo; ++import javax.management.MBeanInfo; ++import javax.management.MBeanOperationInfo; ++import javax.management.MBeanServer; ++import javax.management.ObjectName; ++ ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; ++import org.apache.hadoop.hive.common.metrics.LegacyMetrics.MetricsScope; ++import org.apache.hadoop.hive.conf.HiveConf; ++import org.junit.After; ++import org.junit.Before; ++import org.junit.Test; ++import static org.junit.Assert.*; ++ ++public class TestLegacyMetrics { ++ ++ private static final String scopeName = "foo"; ++ private static final long periodMs = 50L; ++ private static LegacyMetrics metrics; ++ ++ @Before ++ public void before() throws Exception { ++ MetricsFactory.deInit(); ++ HiveConf conf = new HiveConf(); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, LegacyMetrics.class.getCanonicalName()); ++ MetricsFactory.init(conf); ++ metrics = (LegacyMetrics) MetricsFactory.getMetricsInstance(); ++ } ++ ++ @After ++ public void after() throws Exception { ++ MetricsFactory.deInit(); ++ } ++ ++ @Test ++ public void testMetricsMBean() throws Exception { ++ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ++ final ObjectName oname = new ObjectName( ++ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); ++ MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname); ++ // check implementation class: ++ assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName()); ++ ++ // check reset operation: ++ MBeanOperationInfo[] oops = mBeanInfo.getOperations(); ++ boolean resetFound = false; ++ for (MBeanOperationInfo op : oops) { ++ if ("reset".equals(op.getName())) { ++ resetFound = true; ++ break; ++ } ++ } ++ assertTrue(resetFound); ++ ++ // add metric with a non-null value: ++ Attribute attr = new Attribute("fooMetric", Long.valueOf(-77)); ++ mbs.setAttribute(oname, attr); ++ ++ mBeanInfo = mbs.getMBeanInfo(oname); ++ MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes(); ++ assertEquals(1, attrinuteInfos.length); ++ boolean attrFound = false; ++ for (MBeanAttributeInfo info : attrinuteInfos) { ++ if ("fooMetric".equals(info.getName())) { ++ assertEquals("java.lang.Long", info.getType()); ++ assertTrue(info.isReadable()); ++ assertTrue(info.isWritable()); ++ assertFalse(info.isIs()); ++ ++ attrFound = true; ++ break; ++ } ++ } ++ assertTrue(attrFound); ++ ++ // check metric value: ++ Object v = mbs.getAttribute(oname, "fooMetric"); ++ assertEquals(Long.valueOf(-77), v); ++ ++ // reset the bean: ++ Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]); ++ assertNull(result); ++ ++ // the metric value must be zeroed: ++ v = mbs.getAttribute(oname, "fooMetric"); ++ assertEquals(Long.valueOf(0), v); ++ } ++ ++ private void expectIOE(Callable c) throws Exception { ++ try { ++ T t = c.call(); ++ fail("IOE expected but ["+t+"] was returned."); ++ } catch (IOException ioe) { ++ // ok, expected ++ } ++ } ++ ++ @Test ++ public void testScopeSingleThread() throws Exception { ++ metrics.startScope(scopeName); ++ final MetricsScope fooScope = metrics.getScope(scopeName); ++ // the time and number counters become available only after the 1st ++ // scope close: ++ expectIOE(new Callable() { ++ @Override ++ public Long call() throws Exception { ++ Long num = fooScope.getNumCounter(); ++ return num; ++ } ++ }); ++ expectIOE(new Callable() { ++ @Override ++ public Long call() throws Exception { ++ Long time = fooScope.getTimeCounter(); ++ return time; ++ } ++ }); ++ // cannot open scope that is already open: ++ expectIOE(new Callable() { ++ @Override ++ public Void call() throws Exception { ++ fooScope.open(); ++ return null; ++ } ++ }); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ Thread.sleep(periodMs+ 1); ++ // 1st close: ++ // closing of open scope should be ok: ++ metrics.endScope(scopeName); ++ expectIOE(new Callable() { ++ @Override ++ public Void call() throws Exception { ++ metrics.endScope(scopeName); // closing of closed scope not allowed ++ return null; ++ } ++ }); ++ ++ assertEquals(Long.valueOf(1), fooScope.getNumCounter()); ++ final long t1 = fooScope.getTimeCounter().longValue(); ++ assertTrue(t1 > periodMs); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ ++ // opening allowed after closing: ++ metrics.startScope(scopeName); ++ // opening of already open scope not allowed: ++ expectIOE(new Callable() { ++ @Override ++ public Void call() throws Exception { ++ metrics.startScope(scopeName); ++ return null; ++ } ++ }); ++ ++ assertEquals(Long.valueOf(1), fooScope.getNumCounter()); ++ assertEquals(t1, fooScope.getTimeCounter().longValue()); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ Thread.sleep(periodMs + 1); ++ // Reopening (close + open) allowed in opened state: ++ fooScope.reopen(); ++ ++ assertEquals(Long.valueOf(2), fooScope.getNumCounter()); ++ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); ++ ++ Thread.sleep(periodMs + 1); ++ // 3rd close: ++ fooScope.close(); ++ ++ assertEquals(Long.valueOf(3), fooScope.getNumCounter()); ++ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); ++ Double avgT = (Double) metrics.get("foo.avg_t"); ++ assertTrue(avgT.doubleValue() > periodMs); ++ } ++ ++ @Test ++ public void testScopeConcurrency() throws Exception { ++ metrics.startScope(scopeName); ++ MetricsScope fooScope = metrics.getScope(scopeName); ++ final int threads = 10; ++ ExecutorService executorService = Executors.newFixedThreadPool(threads); ++ for (int i=0; i() { ++ @Override ++ public Void call() throws Exception { ++ testScopeImpl(n); ++ return null; ++ } ++ }); ++ } ++ executorService.shutdown(); ++ assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS)); ++ ++ fooScope = metrics.getScope(scopeName); ++ assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter()); ++ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads); ++ Double avgT = (Double) metrics.get("foo.avg_t"); ++ assertTrue(avgT.doubleValue() > periodMs); ++ metrics.endScope(scopeName); ++ } ++ ++ void testScopeImpl(int n) throws Exception { ++ metrics.startScope(scopeName); ++ final MetricsScope fooScope = metrics.getScope(scopeName); ++ // cannot open scope that is already open: ++ expectIOE(new Callable() { ++ @Override ++ public Void call() throws Exception { ++ fooScope.open(); ++ return null; ++ } ++ }); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ Thread.sleep(periodMs+ 1); ++ // 1st close: ++ metrics.endScope(scopeName); // closing of open scope should be ok. ++ ++ assertTrue(fooScope.getNumCounter().longValue() >= 1); ++ final long t1 = fooScope.getTimeCounter().longValue(); ++ assertTrue(t1 > periodMs); ++ ++ expectIOE(new Callable() { ++ @Override ++ public Void call() throws Exception { ++ metrics.endScope(scopeName); // closing of closed scope not allowed ++ return null; ++ } ++ }); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ ++ // opening allowed after closing: ++ metrics.startScope(scopeName); ++ ++ assertTrue(fooScope.getNumCounter().longValue() >= 1); ++ assertTrue(fooScope.getTimeCounter().longValue() >= t1); ++ ++ // opening of already open scope not allowed: ++ expectIOE(new Callable() { ++ @Override ++ public Void call() throws Exception { ++ metrics.startScope(scopeName); ++ return null; ++ } ++ }); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ Thread.sleep(periodMs + 1); ++ // Reopening (close + open) allowed in opened state: ++ fooScope.reopen(); ++ ++ assertTrue(fooScope.getNumCounter().longValue() >= 2); ++ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); ++ ++ Thread.sleep(periodMs + 1); ++ // 3rd close: ++ fooScope.close(); ++ ++ assertTrue(fooScope.getNumCounter().longValue() >= 3); ++ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); ++ Double avgT = (Double) metrics.get("foo.avg_t"); ++ assertTrue(avgT.doubleValue() > periodMs); ++ } ++} +diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java +deleted file mode 100644 +index e85d3f8..0000000 +--- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java ++++ /dev/null +@@ -1,286 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.hadoop.hive.common.metrics; +- +-import java.io.IOException; +-import java.lang.management.ManagementFactory; +-import java.util.concurrent.Callable; +-import java.util.concurrent.ExecutorService; +-import java.util.concurrent.Executors; +-import java.util.concurrent.TimeUnit; +- +-import javax.management.Attribute; +-import javax.management.MBeanAttributeInfo; +-import javax.management.MBeanInfo; +-import javax.management.MBeanOperationInfo; +-import javax.management.MBeanServer; +-import javax.management.ObjectName; +- +-import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope; +-import org.junit.After; +-import org.junit.Before; +-import org.junit.Test; +-import static org.junit.Assert.*; +- +-public class TestMetrics { +- +- private static final String scopeName = "foo"; +- private static final long periodMs = 50L; +- +- @Before +- public void before() throws Exception { +- Metrics.uninit(); +- Metrics.init(); +- } +- +- @After +- public void after() throws Exception { +- Metrics.uninit(); +- } +- +- @Test +- public void testMetricsMBean() throws Exception { +- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); +- final ObjectName oname = new ObjectName( +- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); +- MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname); +- // check implementation class: +- assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName()); +- +- // check reset operation: +- MBeanOperationInfo[] oops = mBeanInfo.getOperations(); +- boolean resetFound = false; +- for (MBeanOperationInfo op : oops) { +- if ("reset".equals(op.getName())) { +- resetFound = true; +- break; +- } +- } +- assertTrue(resetFound); +- +- // add metric with a non-null value: +- Attribute attr = new Attribute("fooMetric", Long.valueOf(-77)); +- mbs.setAttribute(oname, attr); +- +- mBeanInfo = mbs.getMBeanInfo(oname); +- MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes(); +- assertEquals(1, attrinuteInfos.length); +- boolean attrFound = false; +- for (MBeanAttributeInfo info : attrinuteInfos) { +- if ("fooMetric".equals(info.getName())) { +- assertEquals("java.lang.Long", info.getType()); +- assertTrue(info.isReadable()); +- assertTrue(info.isWritable()); +- assertFalse(info.isIs()); +- +- attrFound = true; +- break; +- } +- } +- assertTrue(attrFound); +- +- // check metric value: +- Object v = mbs.getAttribute(oname, "fooMetric"); +- assertEquals(Long.valueOf(-77), v); +- +- // reset the bean: +- Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]); +- assertNull(result); +- +- // the metric value must be zeroed: +- v = mbs.getAttribute(oname, "fooMetric"); +- assertEquals(Long.valueOf(0), v); +- } +- +- private void expectIOE(Callable c) throws Exception { +- try { +- T t = c.call(); +- fail("IOE expected but ["+t+"] was returned."); +- } catch (IOException ioe) { +- // ok, expected +- } +- } +- +- @Test +- public void testScopeSingleThread() throws Exception { +- final MetricsScope fooScope = Metrics.startScope(scopeName); +- // the time and number counters become available only after the 1st +- // scope close: +- expectIOE(new Callable() { +- @Override +- public Long call() throws Exception { +- Long num = fooScope.getNumCounter(); +- return num; +- } +- }); +- expectIOE(new Callable() { +- @Override +- public Long call() throws Exception { +- Long time = fooScope.getTimeCounter(); +- return time; +- } +- }); +- // cannot open scope that is already open: +- expectIOE(new Callable() { +- @Override +- public Void call() throws Exception { +- fooScope.open(); +- return null; +- } +- }); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- Thread.sleep(periodMs+1); +- // 1st close: +- // closing of open scope should be ok: +- Metrics.endScope(scopeName); +- expectIOE(new Callable() { +- @Override +- public Void call() throws Exception { +- Metrics.endScope(scopeName); // closing of closed scope not allowed +- return null; +- } +- }); +- +- assertEquals(Long.valueOf(1), fooScope.getNumCounter()); +- final long t1 = fooScope.getTimeCounter().longValue(); +- assertTrue(t1 > periodMs); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- +- // opening allowed after closing: +- Metrics.startScope(scopeName); +- // opening of already open scope not allowed: +- expectIOE(new Callable() { +- @Override +- public Void call() throws Exception { +- Metrics.startScope(scopeName); +- return null; +- } +- }); +- +- assertEquals(Long.valueOf(1), fooScope.getNumCounter()); +- assertEquals(t1, fooScope.getTimeCounter().longValue()); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- Thread.sleep(periodMs + 1); +- // Reopening (close + open) allowed in opened state: +- fooScope.reopen(); +- +- assertEquals(Long.valueOf(2), fooScope.getNumCounter()); +- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); +- +- Thread.sleep(periodMs + 1); +- // 3rd close: +- fooScope.close(); +- +- assertEquals(Long.valueOf(3), fooScope.getNumCounter()); +- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); +- Double avgT = (Double)Metrics.get("foo.avg_t"); +- assertTrue(avgT.doubleValue() > periodMs); +- } +- +- @Test +- public void testScopeConcurrency() throws Exception { +- MetricsScope fooScope = Metrics.startScope(scopeName); +- final int threads = 10; +- ExecutorService executorService = Executors.newFixedThreadPool(threads); +- for (int i=0; i() { +- @Override +- public Void call() throws Exception { +- testScopeImpl(n); +- return null; +- } +- }); +- } +- executorService.shutdown(); +- assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS)); +- +- fooScope = Metrics.getScope(scopeName); +- assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter()); +- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads); +- Double avgT = (Double)Metrics.get("foo.avg_t"); +- assertTrue(avgT.doubleValue() > periodMs); +- Metrics.endScope(scopeName); +- } +- +- void testScopeImpl(int n) throws Exception { +- final MetricsScope fooScope = Metrics.startScope(scopeName); +- // cannot open scope that is already open: +- expectIOE(new Callable() { +- @Override +- public Void call() throws Exception { +- fooScope.open(); +- return null; +- } +- }); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- Thread.sleep(periodMs+1); +- // 1st close: +- Metrics.endScope(scopeName); // closing of open scope should be ok. +- +- assertTrue(fooScope.getNumCounter().longValue() >= 1); +- final long t1 = fooScope.getTimeCounter().longValue(); +- assertTrue(t1 > periodMs); +- +- expectIOE(new Callable() { +- @Override +- public Void call() throws Exception { +- Metrics.endScope(scopeName); // closing of closed scope not allowed +- return null; +- } +- }); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- +- // opening allowed after closing: +- Metrics.startScope(scopeName); +- +- assertTrue(fooScope.getNumCounter().longValue() >= 1); +- assertTrue(fooScope.getTimeCounter().longValue() >= t1); +- +- // opening of already open scope not allowed: +- expectIOE(new Callable() { +- @Override +- public Void call() throws Exception { +- Metrics.startScope(scopeName); +- return null; +- } +- }); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- Thread.sleep(periodMs + 1); +- // Reopening (close + open) allowed in opened state: +- fooScope.reopen(); +- +- assertTrue(fooScope.getNumCounter().longValue() >= 2); +- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); +- +- Thread.sleep(periodMs + 1); +- // 3rd close: +- fooScope.close(); +- +- assertTrue(fooScope.getNumCounter().longValue() >= 3); +- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); +- Double avgT = (Double)Metrics.get("foo.avg_t"); +- assertTrue(avgT.doubleValue() > periodMs); +- } +-} +diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java +new file mode 100644 +index 0000000..8749349 +--- /dev/null ++++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java +@@ -0,0 +1,138 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics.metrics2; ++ ++import com.codahale.metrics.Counter; ++import com.codahale.metrics.MetricRegistry; ++import com.codahale.metrics.Timer; ++import com.fasterxml.jackson.databind.JsonNode; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; ++import org.apache.hadoop.hive.conf.HiveConf; ++import org.apache.hadoop.hive.shims.ShimLoader; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.Test; ++ ++import java.io.File; ++import java.nio.file.Files; ++import java.nio.file.Paths; ++import java.util.concurrent.Callable; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.TimeUnit; ++ ++import static org.junit.Assert.assertTrue; ++ ++/** ++ * Unit test for new Metrics subsystem. ++ */ ++public class TestCodahaleMetrics { ++ ++ private static File workDir = new File(System.getProperty("test.tmp.dir")); ++ private static File jsonReportFile; ++ public static MetricRegistry metricRegistry; ++ ++ @Before ++ public void before() throws Exception { ++ HiveConf conf = new HiveConf(); ++ ++ jsonReportFile = new File(workDir, "json_reporting"); ++ jsonReportFile.delete(); ++ String defaultFsName = ShimLoader.getHadoopShims().getHadoopConfNames().get("HADOOPFS"); ++ conf.set(defaultFsName, "local"); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, CodahaleMetrics.class.getCanonicalName()); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name()); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString()); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms"); ++ ++ MetricsFactory.init(conf); ++ metricRegistry = ((CodahaleMetrics) MetricsFactory.getMetricsInstance()).getMetricRegistry(); ++ } ++ ++ @After ++ public void after() throws Exception { ++ MetricsFactory.deInit(); ++ } ++ ++ @Test ++ public void testScope() throws Exception { ++ int runs = 5; ++ for (int i = 0; i < runs; i++) { ++ MetricsFactory.getMetricsInstance().startScope("method1"); ++ MetricsFactory.getMetricsInstance().endScope("method1"); ++ } ++ ++ Timer timer = metricRegistry.getTimers().get("api_method1"); ++ Assert.assertEquals(5, timer.getCount()); ++ Assert.assertTrue(timer.getMeanRate() > 0); ++ } ++ ++ ++ @Test ++ public void testCount() throws Exception { ++ int runs = 5; ++ for (int i = 0; i < runs; i++) { ++ MetricsFactory.getMetricsInstance().incrementCounter("count1"); ++ } ++ Counter counter = metricRegistry.getCounters().get("count1"); ++ Assert.assertEquals(5L, counter.getCount()); ++ } ++ ++ @Test ++ public void testConcurrency() throws Exception { ++ int threads = 4; ++ ExecutorService executorService = Executors.newFixedThreadPool(threads); ++ for (int i=0; i< threads; i++) { ++ final int n = i; ++ executorService.submit(new Callable() { ++ @Override ++ public Void call() throws Exception { ++ MetricsFactory.getMetricsInstance().startScope("method2"); ++ MetricsFactory.getMetricsInstance().endScope("method2"); ++ return null; ++ } ++ }); ++ } ++ executorService.shutdown(); ++ assertTrue(executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)); ++ Timer timer = metricRegistry.getTimers().get("api_method2"); ++ Assert.assertEquals(4, timer.getCount()); ++ Assert.assertTrue(timer.getMeanRate() > 0); ++ } ++ ++ @Test ++ public void testFileReporting() throws Exception { ++ int runs = 5; ++ for (int i = 0; i < runs; i++) { ++ MetricsFactory.getMetricsInstance().incrementCounter("count2"); ++ Thread.sleep(100); ++ } ++ ++ Thread.sleep(2000); ++ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); ++ ObjectMapper objectMapper = new ObjectMapper(); ++ ++ JsonNode rootNode = objectMapper.readTree(jsonData); ++ JsonNode countersNode = rootNode.path("counters"); ++ JsonNode methodCounterNode = countersNode.path("count2"); ++ JsonNode countNode = methodCounterNode.path("count"); ++ Assert.assertEquals(countNode.asInt(), 5); ++ } ++} +diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java +new file mode 100644 +index 0000000..25f34d1 +--- /dev/null ++++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java +@@ -0,0 +1,94 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.metastore; ++ ++import com.fasterxml.jackson.databind.JsonNode; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import junit.framework.TestCase; ++import org.apache.hadoop.hive.cli.CliSessionState; ++import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; ++import org.apache.hadoop.hive.conf.HiveConf; ++import org.apache.hadoop.hive.ql.Driver; ++import org.apache.hadoop.hive.ql.session.SessionState; ++import org.apache.hadoop.hive.shims.ShimLoader; ++import org.junit.After; ++import org.junit.AfterClass; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.BeforeClass; ++import org.junit.Test; ++ ++import java.io.File; ++import java.io.IOException; ++import java.nio.file.Files; ++import java.nio.file.Paths; ++ ++/** ++ * Tests Hive Metastore Metrics. ++ */ ++public class TestMetaStoreMetrics { ++ ++ private static File workDir = new File(System.getProperty("test.tmp.dir")); ++ private static File jsonReportFile; ++ ++ private static HiveConf hiveConf; ++ private static Driver driver; ++ ++ ++ @Before ++ public void before() throws Exception { ++ ++ int port = MetaStoreUtils.findFreePort(); ++ ++ jsonReportFile = new File(workDir, "json_reporting"); ++ jsonReportFile.delete(); ++ ++ hiveConf = new HiveConf(TestMetaStoreMetrics.class); ++ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); ++ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); ++ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, true); ++ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); ++ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name()); ++ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString()); ++ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms"); ++ ++ MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf); ++ ++ SessionState.start(new CliSessionState(hiveConf)); ++ driver = new Driver(hiveConf); ++ } ++ ++ @Test ++ public void testMetricsFile() throws Exception { ++ driver.run("show databases"); ++ ++ //give timer thread a chance to print the metrics ++ Thread.sleep(2000); ++ ++ //As the file is being written, try a few times. ++ //This can be replaced by CodahaleMetrics's JsonServlet reporter once it is exposed. ++ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); ++ ObjectMapper objectMapper = new ObjectMapper(); ++ ++ JsonNode rootNode = objectMapper.readTree(jsonData); ++ JsonNode countersNode = rootNode.path("timers"); ++ JsonNode methodCounterNode = countersNode.path("api_get_all_databases"); ++ JsonNode countNode = methodCounterNode.path("count"); ++ Assert.assertTrue(countNode.asInt() > 0); ++ } ++} +diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +index d81c856..1688920 100644 +--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ++++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +@@ -18,39 +18,14 @@ + + package org.apache.hadoop.hive.metastore; + +-import static org.apache.commons.lang.StringUtils.join; +-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT; +-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; +-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName; +- +-import java.io.IOException; +-import java.text.DateFormat; +-import java.text.SimpleDateFormat; +-import java.util.AbstractMap; +-import java.util.ArrayList; +-import java.util.Arrays; +-import java.util.Collections; +-import java.util.Formatter; +-import java.util.HashMap; +-import java.util.HashSet; +-import java.util.Iterator; +-import java.util.LinkedHashMap; +-import java.util.LinkedList; +-import java.util.List; +-import java.util.Map; +-import java.util.Map.Entry; +-import java.util.Properties; +-import java.util.Set; +-import java.util.Timer; +-import java.util.concurrent.TimeUnit; +-import java.util.concurrent.atomic.AtomicBoolean; +-import java.util.concurrent.locks.Condition; +-import java.util.concurrent.locks.Lock; +-import java.util.concurrent.locks.ReentrantLock; +-import java.util.regex.Pattern; +- +-import javax.jdo.JDOException; +- ++import com.facebook.fb303.FacebookBase; ++import com.facebook.fb303.fb_status; ++import com.google.common.annotations.VisibleForTesting; ++import com.google.common.base.Splitter; ++import com.google.common.collect.ImmutableList; ++import com.google.common.collect.ImmutableListMultimap; ++import com.google.common.collect.Lists; ++import com.google.common.collect.Multimaps; + import org.apache.commons.cli.OptionBuilder; + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; +@@ -58,12 +33,13 @@ + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.common.FileUtils; ++import org.apache.hadoop.hive.common.JvmPauseMonitor; + import org.apache.hadoop.hive.common.LogUtils; + import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; + import org.apache.hadoop.hive.common.classification.InterfaceAudience; + import org.apache.hadoop.hive.common.classification.InterfaceStability; + import org.apache.hadoop.hive.common.cli.CommonCliOptions; +-import org.apache.hadoop.hive.common.metrics.Metrics; ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; + import org.apache.hadoop.hive.conf.HiveConf; + import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +@@ -221,14 +197,35 @@ + import org.apache.thrift.transport.TTransport; + import org.apache.thrift.transport.TTransportFactory; + +-import com.facebook.fb303.FacebookBase; +-import com.facebook.fb303.fb_status; +-import com.google.common.annotations.VisibleForTesting; +-import com.google.common.base.Splitter; +-import com.google.common.collect.ImmutableList; +-import com.google.common.collect.ImmutableListMultimap; +-import com.google.common.collect.Lists; +-import com.google.common.collect.Multimaps; ++import javax.jdo.JDOException; ++import java.io.IOException; ++import java.text.DateFormat; ++import java.text.SimpleDateFormat; ++import java.util.AbstractMap; ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.Collections; ++import java.util.Formatter; ++import java.util.HashMap; ++import java.util.HashSet; ++import java.util.Iterator; ++import java.util.LinkedHashMap; ++import java.util.LinkedList; ++import java.util.List; ++import java.util.Map; ++import java.util.Map.Entry; ++import java.util.Properties; ++import java.util.Set; ++import java.util.Timer; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.concurrent.locks.Condition; ++import java.util.concurrent.locks.Lock; ++import java.util.concurrent.locks.ReentrantLock; ++import java.util.regex.Pattern; ++ ++import static org.apache.commons.lang.StringUtils.join; ++import static org.apache.hadoop.hive.metastore.MetaStoreUtils.*; + + /** + * TODO:pc remove application logic to a separate interface. +@@ -464,9 +461,10 @@ public void init() throws MetaException { + } + } + +- if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) { ++ //Start Metrics for Embedded mode ++ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { + try { +- Metrics.init(); ++ MetricsFactory.init(hiveConf); + } catch (Exception e) { + // log exception, but ignore inability to start + LOG.error("error in Metrics init: " + e.getClass().getName() + " " +@@ -750,11 +748,13 @@ private String startFunction(String function, String extraLogInfo) { + incrementCounter(function); + logInfo((getIpAddress() == null ? "" : "source:" + getIpAddress() + " ") + + function + extraLogInfo); +- try { +- Metrics.startScope(function); +- } catch (IOException e) { +- LOG.debug("Exception when starting metrics scope" ++ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { ++ try { ++ MetricsFactory.getMetricsInstance().startScope(function); ++ } catch (IOException e) { ++ LOG.debug("Exception when starting metrics scope" + + e.getClass().getName() + " " + e.getMessage(), e); ++ } + } + return function; + } +@@ -792,10 +792,12 @@ private void endFunction(String function, boolean successful, Exception e, + } + + private void endFunction(String function, MetaStoreEndFunctionContext context) { +- try { +- Metrics.endScope(function); +- } catch (IOException e) { +- LOG.debug("Exception when closing metrics scope" + e); ++ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { ++ try { ++ MetricsFactory.getMetricsInstance().endScope(function); ++ } catch (IOException e) { ++ LOG.debug("Exception when closing metrics scope" + e); ++ } + } + + for (MetaStoreEndFunctionListener listener : endFunctionListeners) { +@@ -819,6 +821,14 @@ public void shutdown() { + threadLocalMS.remove(); + } + } ++ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { ++ try { ++ MetricsFactory.deInit(); ++ } catch (Exception e) { ++ LOG.error("error in Metrics deinit: " + e.getClass().getName() + " " ++ + e.getMessage(), e); ++ } ++ } + logInfo("Metastore shutdown complete."); + } + +@@ -5901,6 +5911,16 @@ public void run() { + } + }); + ++ //Start Metrics for Standalone (Remote) Mode ++ if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) { ++ try { ++ MetricsFactory.init(conf); ++ } catch (Exception e) { ++ // log exception, but ignore inability to start ++ LOG.error("error in Metrics init: " + e.getClass().getName() + " " ++ + e.getMessage(), e); ++ } ++ } + + Lock startLock = new ReentrantLock(); + Condition startCondition = startLock.newCondition(); +@@ -6091,7 +6111,13 @@ public void run() { + // Wrap the start of the threads in a catch Throwable loop so that any failures + // don't doom the rest of the metastore. + startLock.lock(); +- ShimLoader.getHadoopShims().startPauseMonitor(conf); ++ try { ++ JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf); ++ pauseMonitor.start(); ++ } catch (Throwable t) { ++ LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " + ++ "warned upon.", t); ++ } + + try { + // Per the javadocs on Condition, do not depend on the condition alone as a start gate +diff --git a/pom.xml b/pom.xml +index b21d894..35133f2 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -116,6 +116,7 @@ + 1.5.4 + 1.4 + 10.11.1.1 ++ 3.1.0 + 14.0.1 + 2.1.6 + 1.2.1 +@@ -128,6 +129,8 @@ + 4.4 + 2.4.0 + 1.9.2 ++ ++ 2.4.2 + 0.3.2 + 5.5.1 + 3.0.1 +diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java +index 58e8e49..7820ed5 100644 +--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java ++++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java +@@ -42,14 +42,15 @@ + import org.apache.curator.framework.api.CuratorEventType; + import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; + import org.apache.curator.retry.ExponentialBackoffRetry; ++import org.apache.hadoop.hive.common.JvmPauseMonitor; + import org.apache.hadoop.hive.common.LogUtils; + import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; + import org.apache.hadoop.hive.conf.HiveConf; + import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; + import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; + import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; +-import org.apache.hadoop.hive.shims.ShimLoader; + import org.apache.hadoop.hive.shims.Utils; + import org.apache.hadoop.security.UserGroupInformation; + import org.apache.hive.common.util.HiveStringUtils; +@@ -305,6 +306,15 @@ public synchronized void stop() { + LOG.info("Shutting down HiveServer2"); + HiveConf hiveConf = this.getHiveConf(); + super.stop(); ++ // Shutdown Metrics ++ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) { ++ try { ++ MetricsFactory.getMetricsInstance().deInit(); ++ } catch (Exception e) { ++ LOG.error("error in Metrics deinit: " + e.getClass().getName() + " " ++ + e.getMessage(), e); ++ } ++ } + // Remove this server instance from ZooKeeper if dynamic service discovery is set + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + try { +@@ -344,7 +354,18 @@ private static void startHiveServer2() throws Throwable { + server = new HiveServer2(); + server.init(hiveConf); + server.start(); +- ShimLoader.getHadoopShims().startPauseMonitor(hiveConf); ++ ++ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) { ++ MetricsFactory.getMetricsInstance().init(hiveConf); ++ } ++ try { ++ JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf); ++ pauseMonitor.start(); ++ } catch (Throwable t) { ++ LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " + ++ "warned upon.", t); ++ } ++ + // If we're supporting dynamic service discovery, we'll add the service uri for this + // HiveServer2 instance to Zookeeper as a znode. + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { +diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +index 6d8166c..ffffcb7 100644 +--- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java ++++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +@@ -159,11 +159,6 @@ public TaskAttemptID newTaskAttemptID(JobID jobId, boolean isMap, int taskId, in + } + + @Override +- public void startPauseMonitor(Configuration conf) { +- /* no supported */ +- } +- +- @Override + public boolean isLocalMode(Configuration conf) { + return "local".equals(getJobLauncherRpcAddress(conf)); + } +diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +index 19324b8..5ddab98 100644 +--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java ++++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +@@ -212,19 +212,6 @@ public TaskAttemptID newTaskAttemptID(JobID jobId, boolean isMap, int taskId, in + } + + @Override +- public void startPauseMonitor(Configuration conf) { +- try { +- Class.forName("org.apache.hadoop.util.JvmPauseMonitor"); +- org.apache.hadoop.util.JvmPauseMonitor pauseMonitor = new org.apache.hadoop.util +- .JvmPauseMonitor(conf); +- pauseMonitor.start(); +- } catch (Throwable t) { +- LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " + +- "warned upon.", t); +- } +- } +- +- @Override + public boolean isLocalMode(Configuration conf) { + return "local".equals(conf.get("mapreduce.framework.name")); + } +diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +index 5a6bc44..5b7e7f6 100644 +--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java ++++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +@@ -146,8 +146,6 @@ MiniDFSShim getMiniDfs(Configuration conf, + + public JobContext newJobContext(Job job); + +- public void startPauseMonitor(Configuration conf); +- + /** + * Check wether MR is configured to run in local-mode + * @param conf diff --git a/testutils/ptest2/src/test/resources/HIVE-11271.4.patch b/testutils/ptest2/src/test/resources/HIVE-11271.4.patch new file mode 100644 index 0000000..4a07c37 --- /dev/null +++ b/testutils/ptest2/src/test/resources/HIVE-11271.4.patch @@ -0,0 +1,606 @@ +diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java +index c076d4e112a7edab2106f11fe6224247887313cf..8bcb464de540eda7c14a8c6783bb19a09071af7b 100644 +--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java ++++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java +@@ -25,7 +25,9 @@ + + import org.apache.hadoop.hive.ql.exec.ColumnInfo; + import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; ++import org.apache.hadoop.hive.ql.exec.FilterOperator; + import org.apache.hadoop.hive.ql.exec.Operator; ++import org.apache.hadoop.hive.ql.exec.OperatorFactory; + import org.apache.hadoop.hive.ql.exec.RowSchema; + import org.apache.hadoop.hive.ql.exec.SelectOperator; + import org.apache.hadoop.hive.ql.exec.UnionOperator; +@@ -33,6 +35,7 @@ + import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; + import org.apache.hadoop.hive.ql.parse.ParseContext; + import org.apache.hadoop.hive.ql.parse.SemanticException; ++import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; + import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; + import org.apache.hadoop.hive.ql.plan.OperatorDesc; + import org.apache.hadoop.hive.ql.plan.SelectDesc; +@@ -241,4 +244,65 @@ public ParseContext getParseContext() { + } + return columns; + } ++ ++ /** ++ * If the input filter operator has direct child(ren) which are union operator, ++ * and the filter's column is not the same as union's ++ * create select operator between them. The select operator has same number of columns as ++ * pruned child operator. ++ * ++ * @param curOp ++ * The filter operator which need to handle children. ++ * @throws SemanticException ++ */ ++ public void handleFilterUnionChildren(Operator curOp) ++ throws SemanticException { ++ if (curOp.getChildOperators() == null || !(curOp instanceof FilterOperator)) { ++ return; ++ } ++ List parentPrunList = prunedColLists.get(curOp); ++ if(parentPrunList == null || parentPrunList.size() == 0) { ++ return; ++ } ++ FilterOperator filOp = (FilterOperator)curOp; ++ List prunList = null; ++ List[] childToParentIndex = null; ++ ++ for (Operator child : curOp.getChildOperators()) { ++ if (child instanceof UnionOperator) { ++ prunList = prunedColLists.get(child); ++ if (prunList == null || prunList.size() == 0 || parentPrunList.size() == prunList.size()) { ++ continue; ++ } ++ ++ ArrayList exprs = new ArrayList(); ++ ArrayList outputColNames = new ArrayList(); ++ Map colExprMap = new HashMap(); ++ ArrayList outputRS = new ArrayList(); ++ for (ColumnInfo colInfo : child.getSchema().getSignature()) { ++ if (!prunList.contains(colInfo.getInternalName())) { ++ continue; ++ } ++ ExprNodeDesc colDesc = new ExprNodeColumnDesc(colInfo.getType(), ++ colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol()); ++ exprs.add(colDesc); ++ outputColNames.add(colInfo.getInternalName()); ++ ColumnInfo newCol = new ColumnInfo(colInfo.getInternalName(), colInfo.getType(), ++ colInfo.getTabAlias(), colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol()); ++ newCol.setAlias(colInfo.getAlias()); ++ outputRS.add(newCol); ++ colExprMap.put(colInfo.getInternalName(), colDesc); ++ } ++ SelectDesc select = new SelectDesc(exprs, outputColNames, false); ++ curOp.removeChild(child); ++ SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild( ++ select, new RowSchema(outputRS), curOp); ++ OperatorFactory.makeChild(sel, child); ++ sel.setColumnExprMap(colExprMap); ++ ++ } ++ ++ } ++ } ++ + } +diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +index ac4236c53adf6fa36ad43b2e9029d335f12efde2..2dc15f9f0ae96bdc7f33f3d97ad41c88117734d0 100644 +--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ++++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +@@ -108,7 +108,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, + filterOpPrunedColListsOrderPreserved); + + pruneOperator(cppCtx, op, cppCtx.getPrunedColLists().get(op)); +- ++ cppCtx.handleFilterUnionChildren(op); + return null; + } + } +diff --git a/ql/src/test/queries/clientpositive/unionall_unbalancedppd.q b/ql/src/test/queries/clientpositive/unionall_unbalancedppd.q +new file mode 100644 +index 0000000000000000000000000000000000000000..0825c2d94d0f9815c7ff88549c77662e53adf928 +--- /dev/null ++++ b/ql/src/test/queries/clientpositive/unionall_unbalancedppd.q +@@ -0,0 +1,120 @@ ++set hive.optimize.ppd=true; ++drop table if exists union_all_bug_test_1; ++drop table if exists union_all_bug_test_2; ++create table if not exists union_all_bug_test_1 ++( ++f1 int, ++f2 int ++); ++ ++create table if not exists union_all_bug_test_2 ++( ++f1 int ++); ++ ++explain SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1); ++ ++SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1); ++ ++insert into table union_all_bug_test_1 values (1,1); ++insert into table union_all_bug_test_2 values (1); ++insert into table union_all_bug_test_1 values (0,0); ++insert into table union_all_bug_test_2 values (0); ++ ++SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1); ++ ++SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 0); ++ ++SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1 or filter = 0); ++ ++SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (f1 = 1); +diff --git a/ql/src/test/results/clientpositive/unionall_unbalancedppd.q.out b/ql/src/test/results/clientpositive/unionall_unbalancedppd.q.out +new file mode 100644 +index 0000000000000000000000000000000000000000..46828e9db772f20b47c3ae6aac1239bbcbabd752 +--- /dev/null ++++ b/ql/src/test/results/clientpositive/unionall_unbalancedppd.q.out +@@ -0,0 +1,373 @@ ++PREHOOK: query: drop table if exists union_all_bug_test_1 ++PREHOOK: type: DROPTABLE ++POSTHOOK: query: drop table if exists union_all_bug_test_1 ++POSTHOOK: type: DROPTABLE ++PREHOOK: query: drop table if exists union_all_bug_test_2 ++PREHOOK: type: DROPTABLE ++POSTHOOK: query: drop table if exists union_all_bug_test_2 ++POSTHOOK: type: DROPTABLE ++PREHOOK: query: create table if not exists union_all_bug_test_1 ++( ++f1 int, ++f2 int ++) ++PREHOOK: type: CREATETABLE ++PREHOOK: Output: database:default ++PREHOOK: Output: default@union_all_bug_test_1 ++POSTHOOK: query: create table if not exists union_all_bug_test_1 ++( ++f1 int, ++f2 int ++) ++POSTHOOK: type: CREATETABLE ++POSTHOOK: Output: database:default ++POSTHOOK: Output: default@union_all_bug_test_1 ++PREHOOK: query: create table if not exists union_all_bug_test_2 ++( ++f1 int ++) ++PREHOOK: type: CREATETABLE ++PREHOOK: Output: database:default ++PREHOOK: Output: default@union_all_bug_test_2 ++POSTHOOK: query: create table if not exists union_all_bug_test_2 ++( ++f1 int ++) ++POSTHOOK: type: CREATETABLE ++POSTHOOK: Output: database:default ++POSTHOOK: Output: default@union_all_bug_test_2 ++PREHOOK: query: explain SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1) ++PREHOOK: type: QUERY ++POSTHOOK: query: explain SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1) ++POSTHOOK: type: QUERY ++STAGE DEPENDENCIES: ++ Stage-1 is a root stage ++ Stage-0 depends on stages: Stage-1 ++ ++STAGE PLANS: ++ Stage: Stage-1 ++ Map Reduce ++ Map Operator Tree: ++ TableScan ++ alias: union_all_bug_test_1 ++ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ Filter Operator ++ predicate: (if(true, f1, f2) = 1) (type: boolean) ++ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ Select Operator ++ expressions: f1 (type: int) ++ outputColumnNames: _col0 ++ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ Union ++ Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ File Output Operator ++ compressed: false ++ Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ table: ++ input format: org.apache.hadoop.mapred.TextInputFormat ++ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat ++ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe ++ TableScan ++ alias: union_all_bug_test_2 ++ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ Filter Operator ++ predicate: false (type: boolean) ++ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ Select Operator ++ expressions: f1 (type: int) ++ outputColumnNames: _col0 ++ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ Union ++ Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ File Output Operator ++ compressed: false ++ Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE ++ table: ++ input format: org.apache.hadoop.mapred.TextInputFormat ++ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat ++ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe ++ ++ Stage: Stage-0 ++ Fetch Operator ++ limit: -1 ++ Processor Tree: ++ ListSink ++ ++PREHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@union_all_bug_test_1 ++PREHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++POSTHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@union_all_bug_test_1 ++POSTHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++PREHOOK: query: insert into table union_all_bug_test_1 values (1,1) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@values__tmp__table__1 ++PREHOOK: Output: default@union_all_bug_test_1 ++POSTHOOK: query: insert into table union_all_bug_test_1 values (1,1) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@values__tmp__table__1 ++POSTHOOK: Output: default@union_all_bug_test_1 ++POSTHOOK: Lineage: union_all_bug_test_1.f1 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] ++POSTHOOK: Lineage: union_all_bug_test_1.f2 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] ++PREHOOK: query: insert into table union_all_bug_test_2 values (1) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@values__tmp__table__2 ++PREHOOK: Output: default@union_all_bug_test_2 ++POSTHOOK: query: insert into table union_all_bug_test_2 values (1) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@values__tmp__table__2 ++POSTHOOK: Output: default@union_all_bug_test_2 ++POSTHOOK: Lineage: union_all_bug_test_2.f1 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] ++PREHOOK: query: insert into table union_all_bug_test_1 values (0,0) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@values__tmp__table__3 ++PREHOOK: Output: default@union_all_bug_test_1 ++POSTHOOK: query: insert into table union_all_bug_test_1 values (0,0) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@values__tmp__table__3 ++POSTHOOK: Output: default@union_all_bug_test_1 ++POSTHOOK: Lineage: union_all_bug_test_1.f1 EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ] ++POSTHOOK: Lineage: union_all_bug_test_1.f2 EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ] ++PREHOOK: query: insert into table union_all_bug_test_2 values (0) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@values__tmp__table__4 ++PREHOOK: Output: default@union_all_bug_test_2 ++POSTHOOK: query: insert into table union_all_bug_test_2 values (0) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@values__tmp__table__4 ++POSTHOOK: Output: default@union_all_bug_test_2 ++POSTHOOK: Lineage: union_all_bug_test_2.f1 EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col1, type:string, comment:), ] ++PREHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@union_all_bug_test_1 ++PREHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++POSTHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@union_all_bug_test_1 ++POSTHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++1 ++PREHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 0) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@union_all_bug_test_1 ++PREHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++POSTHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 0) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@union_all_bug_test_1 ++POSTHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++0 ++1 ++0 ++PREHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1 or filter = 0) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@union_all_bug_test_1 ++PREHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++POSTHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (filter = 1 or filter = 0) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@union_all_bug_test_1 ++POSTHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++1 ++0 ++1 ++0 ++PREHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (f1 = 1) ++PREHOOK: type: QUERY ++PREHOOK: Input: default@union_all_bug_test_1 ++PREHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++POSTHOOK: query: SELECT f1 ++FROM ( ++ ++SELECT ++f1 ++, if('helloworld' like '%hello%' ,f1,f2) as filter ++FROM union_all_bug_test_1 ++ ++union all ++ ++select ++f1 ++, 0 as filter ++from union_all_bug_test_2 ++) A ++WHERE (f1 = 1) ++POSTHOOK: type: QUERY ++POSTHOOK: Input: default@union_all_bug_test_1 ++POSTHOOK: Input: default@union_all_bug_test_2 ++#### A masked pattern was here #### ++1 ++1 diff --git a/testutils/ptest2/src/test/resources/HIVE-9377.1.patch b/testutils/ptest2/src/test/resources/HIVE-9377.1.patch new file mode 100644 index 0000000..9d2d5b6 --- /dev/null +++ b/testutils/ptest2/src/test/resources/HIVE-9377.1.patch @@ -0,0 +1,25 @@ +Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInFile.java +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInFile.java (date 1421263954000) ++++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInFile.java (revision ) +@@ -140,6 +140,17 @@ + } + + @Override ++ public void copyToNewInstance(Object newInstance) throws UDFArgumentException { ++ super.copyToNewInstance(newInstance); // Asserts the class invariant. (Same types.) ++ GenericUDFInFile that = (GenericUDFInFile)newInstance; ++ if (that != this) { ++ that.set = (this.set == null ? null : (HashSet)this.set.clone()); ++ that.strObjectInspector = this.strObjectInspector; ++ that.fileObjectInspector = this.fileObjectInspector; ++ } ++ } ++ ++ @Override + public String getDisplayString(String[] children) { + assert (children.length == 2); + return "in_file(" + children[0] + ", " + children[1] + ")"; diff --git a/testutils/ptest2/src/test/resources/remove-test.patch b/testutils/ptest2/src/test/resources/remove-test.patch new file mode 100644 index 0000000..3eac9d7 --- /dev/null +++ b/testutils/ptest2/src/test/resources/remove-test.patch @@ -0,0 +1,33 @@ +diff --git a/ql/src/test/queries/clientpositive/join0.q b/ql/src/test/queries/clientpositive/join0.q +deleted file mode 100644 +index 6ef6843..0000000 +--- a/ql/src/test/queries/clientpositive/join0.q ++++ /dev/null +@@ -1,27 +0,0 @@ +-set hive.explain.user=false; +--- JAVA_VERSION_SPECIFIC_OUTPUT +--- SORT_QUERY_RESULTS +- +-EXPLAIN +-SELECT src1.key as k1, src1.value as v1, +- src2.key as k2, src2.value as v2 FROM +- (SELECT * FROM src WHERE src.key < 10) src1 +- JOIN +- (SELECT * FROM src WHERE src.key < 10) src2 +- SORT BY k1, v1, k2, v2; +- +-EXPLAIN FORMATTED +-SELECT src1.key as k1, src1.value as v1, +- src2.key as k2, src2.value as v2 FROM +- (SELECT * FROM src WHERE src.key < 10) src1 +- JOIN +- (SELECT * FROM src WHERE src.key < 10) src2 +- SORT BY k1, v1, k2, v2; +- +-SELECT src1.key as k1, src1.value as v1, +- src2.key as k2, src2.value as v2 FROM +- (SELECT * FROM src WHERE src.key < 10) src1 +- JOIN +- (SELECT * FROM src WHERE src.key < 10) src2 +- SORT BY k1, v1, k2, v2; +-