diff --git a/bin/local-master-backup.sh b/bin/local-master-backup.sh index bd265ab..cc2dc56 100755 --- a/bin/local-master-backup.sh +++ b/bin/local-master-backup.sh @@ -40,6 +40,7 @@ run_master () { DN=$2 export HBASE_IDENT_STRING="$USER-$DN" HBASE_MASTER_ARGS="\ + -D hbase.master.port=`expr 16000 + $DN` \ -D hbase.master.info.port=`expr 16010 + $DN` \ -D hbase.regionserver.port=`expr 16020 + $DN` \ -D hbase.regionserver.info.port=`expr 16030 + $DN` \ diff --git a/dev-support/hbase-personality.sh b/dev-support/hbase-personality.sh new file mode 100755 index 0000000..2d31fd5 --- /dev/null +++ b/dev-support/hbase-personality.sh @@ -0,0 +1,345 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# You'll need a local installation of +# [Apache Yetus' precommit checker](http://yetus.apache.org/documentation/0.1.0/#yetus-precommit) +# to use this personality. +# +# Download from: http://yetus.apache.org/downloads/ . You can either grab the source artifact and +# build from it, or use the convenience binaries provided on that download page. +# +# To run against, e.g. HBASE-15074 you'd then do +# ```bash +# test-patch --personality=dev-support/hbase-personality.sh HBASE-15074 +# ``` +# +# If you want to skip the ~1 hour it'll take to do all the hadoop API checks, use +# ```bash +# test-patch --plugins=all,-hadoopcheck --personality=dev-support/hbase-personality.sh HBASE-15074 +# ```` +# +# pass the `--jenkins` flag if you want to allow test-patch to destructively alter local working +# directory / branch in order to have things match what the issue patch requests. + +personality_plugins "all" + +function personality_globals +{ + #shellcheck disable=SC2034 + PROJECT_NAME=hbase + #shellcheck disable=SC2034 + PATCH_BRANCH_DEFAULT=master + #shellcheck disable=SC2034 + JIRA_ISSUE_RE='^HBASE-[0-9]+$' + #shellcheck disable=SC2034 + GITHUB_REPO="apache/hbase" + + # TODO use PATCH_BRANCH to select hadoop versions to use. + # All supported Hadoop versions that we want to test the compilation with + HBASE_HADOOP_VERSIONS="2.4.0 2.4.1 2.5.0 2.5.1 2.5.2 2.6.1 2.6.2 2.6.3 2.7.1" + + # TODO use PATCH_BRANCH to select jdk versions to use. + + # Override the maven options + MAVEN_OPTS="${MAVEN_OPTS:-"-Xmx3100M"}" + +} + +function personality_modules +{ + local repostatus=$1 + local testtype=$2 + local extra="" + + yetus_debug "Personality: ${repostatus} ${testtype}" + + clear_personality_queue + + extra="-DHBasePatchProcess" + + if [[ ${repostatus} == branch + && ${testtype} == mvninstall ]];then + personality_enqueue_module . ${extra} + return + fi + + if [[ ${testtype} = findbugs ]]; then + for module in ${CHANGED_MODULES}; do + # skip findbugs on hbase-shell + if [[ ${module} == hbase-shell ]]; then + continue + else + # shellcheck disable=SC2086 + personality_enqueue_module ${module} ${extra} + fi + done + return + fi + + if [[ ${testtype} = unit ]]; then + extra="${extra} -PrunAllTests" + + # Inject the jenkins build-id for our surefire invocations + # Used by zombie detection stuff, even though we're not including that yet. + if [ -n "${BUILD_ID}" ]; then + extra="${extra} -Dbuild.id=${BUILD_ID}" + fi + fi + + for module in ${CHANGED_MODULES}; do + # shellcheck disable=SC2086 + personality_enqueue_module ${module} ${extra} + done +} + +################################################### +# Below here are our one-off tests specific to hbase. +# TODO break them into individual files so it's easier to maintain them? + +# TODO line length check? could ignore all java files since checkstyle gets them. + +################################################### + +add_test_type hadoopcheck + +function hadoopcheck_filefilter +{ + local filename=$1 + + if [[ ${filename} =~ \.java$ ]]; then + add_test hadoopcheck + fi +} + +function hadoopcheck_rebuild +{ + local repostatus=$1 + local hadoopver + local logfile + local count + local result=0 + + if [[ "${repostatus}" = branch ]]; then + return 0 + fi + + big_console_header "Compiling against various Hadoop versions" + + export MAVEN_OPTS="${MAVEN_OPTS}" + for hadoopver in ${HBASE_HADOOP_VERSIONS}; do + logfile="${PATCH_DIR}/patch-javac-${hadoopver}.txt" + echo_and_redirect "${logfile}" \ + "${MAVEN}" clean install \ + -DskipTests -DHBasePatchProcess \ + -Dhadoop-two.version="${hadoopver}" + count=$(${GREP} -c ERROR "${logfile}") + if [[ ${count} -gt 0 ]]; then + add_vote_table -1 hadoopcheck "Patch causes ${count} errors with Hadoop v${hadoopver}." + ((result=result+1)) + fi + done + + if [[ ${result} -gt 0 ]]; then + return 1 + fi + + add_vote_table +1 hadoopcheck "Patch does not cause any errors with Hadoop ${HBASE_HADOOP_VERSIONS}." + return 0 +} + +###################################### + +# TODO if we need th protoc check, we probably need to check building all the modules that rely on hbase-protocol +add_test_type hbaseprotoc + +function hbaseprotoc_filefilter +{ + local filename=$1 + + if [[ ${filename} =~ \.proto$ ]]; then + add_test hbaseprotoc + fi +} + +function hbaseprotoc_rebuild +{ + local i=0 + local fn + local module + local logfile + local count + local result + + if [[ "${repostatus}" = branch ]]; then + return 0 + fi + + verify_needed_test hbaseprotoc + if [[ $? == 0 ]]; then + return 0 + fi + + big_console_header "Patch HBase protoc plugin" + + start_clock + + + personality_modules patch hbaseprotoc + modules_workers patch hbaseprotoc compile -DskipTests -Pcompile-protobuf -X -DHBasePatchProcess + + # shellcheck disable=SC2153 + until [[ $i -eq ${#MODULE[@]} ]]; do + if [[ ${MODULE_STATUS[${i}]} == -1 ]]; then + ((result=result+1)) + ((i=i+1)) + continue + fi + module=${MODULE[$i]} + fn=$(module_file_fragment "${module}") + logfile="${PATCH_DIR}/patch-hbaseprotoc-${fn}.txt" + + count=$(${GREP} -c ERROR "${logfile}") + + if [[ ${count} -gt 0 ]]; then + module_status ${i} -1 "patch-hbaseprotoc-${fn}.txt" "Patch generated "\ + "${count} new protoc errors in ${module}." + ((result=result+1)) + fi + ((i=i+1)) + done + + modules_messages patch hbaseprotoc true + if [[ ${result} -gt 0 ]]; then + return 1 + fi + return 0 +} + +###################################### + +add_test_type hbaseanti + +function hbaseanti_filefilter +{ + local filename=$1 + + if [[ ${filename} =~ \.java$ ]]; then + add_test hbaseanti + fi +} + +function hbaseanti_patchfile +{ + local patchfile=$1 + local warnings + local result + + verify_needed_test hbaseanti + if [[ $? == 0 ]]; then + return 0 + fi + + big_console_header "Checking for known anti-patterns" + + start_clock + + warnings=$(${GREP} 'new TreeMap/dev/null; then +# "${BUILDTOOL}_${testtype}_logfilter" "${input}" "${output}" +# elif declare -f ${testtype}_logfilter >/dev/null; then +# "${testtype}_logfilter" "${input}" "${output}" +# fi +# +# start_clock +# if [ -n "${BUILD_ID}" ]; then +# yetus_debug "Checking for zombie test processes." +# processes=$(jps -v | "${GREP}" surefirebooter | "${GREP}" -e "hbase.build.id=${BUILD_ID}") +# if [ -n "${processes}" ] && [ "$(echo "${processes}" | wc -l)" -gt 0 ]; then +# yetus_warn "Found some suspicious process(es). Waiting a bit to see if they're just slow to stop." +# yetus_debug "${processes}" +# sleep 30 +# #shellcheck disable=SC2016 +# for pid in $(echo "${processes}"| ${AWK} '{print $1}'); do +# # Test our zombie still running (and that it still an hbase build item) +# process_output=$(ps -p "${pid}" | tail +2 | "${GREP}" -e "hbase.build.id=${BUILD_ID}") +# if [[ -n "${process_output}" ]]; then +# yetus_error "Zombie: ${process_output}" +# ((zombie_count = zombie_count + 1)) +# zombie_process=$(jstack "${pid}" | "${GREP}" -e "\.Test" | "${GREP}" -e "\.java"| head -3) +# zombies="${zombies} ${zombie_process}" +# fi +# done +# fi +# if [ "${zombie_count}" -ne 0 ]; then +# add_vote_table -1 zombies "There are ${zombie_count} zombie test(s)" +# populate_test_table "zombie unit tests" "${zombies}" +# else +# yetus_info "Zombie check complete. All test runs exited normally." +# stop_clock +# fi +# else +# add_vote_table -0 zombies "There is no BUILD_ID env variable; can't check for zombies." +# fi +# +#} diff --git a/dev-support/test-patch.properties b/dev-support/test-patch.properties deleted file mode 100644 index bc29896..0000000 --- a/dev-support/test-patch.properties +++ /dev/null @@ -1,35 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -MAVEN_OPTS="${MAVEN_OPTS:-"-Xmx3100M"}" - -# The number of acceptable warning for *all* modules -# Please update the per-module test-patch.properties if you update this file. - -OK_RELEASEAUDIT_WARNINGS=0 -# Allow four warnings. Javadoc complains about sun.misc.Unsafe use. -# See HBASE-7457, HBASE-13761 -# Allow 2 additional warnings for Scala stub notice about MR. See HBASE-13992 -OK_JAVADOC_WARNINGS=9 - -MAX_LINE_LENGTH=100 - -# All supported branches for testing with precommit build -# branch-1.x should apprear before branch-1 since the latter is a prefix -BRANCH_NAMES="0.94 0.98 branch-1.0 branch-1.1 branch-1.2 branch-1 master hbase-12439 hbase-11339" - -# All supported Hadoop versions that we want to test the compilation with -HADOOP2_VERSIONS="2.4.0 2.4.1 2.5.0 2.5.1 2.5.2 2.6.0 2.6.1 2.7.0 2.7.1" -HADOOP3_VERSIONS="3.0.0-SNAPSHOT" diff --git a/dev-support/test-patch.sh b/dev-support/test-patch.sh deleted file mode 100755 index d0c0346..0000000 --- a/dev-support/test-patch.sh +++ /dev/null @@ -1,1070 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -#set -x - -### Setup some variables. -### GIT_COMMIT and BUILD_URL are set by Hudson if it is run by patch process -### Read variables from properties file -bindir=$(dirname $0) - -# Defaults -if [ -z "$MAVEN_HOME" ]; then - MVN=mvn -else - MVN=$MAVEN_HOME/bin/mvn -fi - -NEWLINE=$'\n' - -PROJECT_NAME=HBase -JENKINS=false -MOVE_PATCH_DIR=true -PATCH_DIR=/tmp -BASEDIR=$(pwd) -BRANCH_NAME="master" - -. $BASEDIR/dev-support/test-patch.properties - -PS=${PS:-ps} -AWK=${AWK:-awk} -WGET=${WGET:-wget} -GREP=${GREP:-grep} -EGREP=${EGREP:-egrep} -PATCH=${PATCH:-patch} -JIRACLI=${JIRA:-jira} -FINDBUGS_HOME=${FINDBUGS_HOME} -FORREST_HOME=${FORREST_HOME} -ECLIPSE_HOME=${ECLIPSE_HOME} -GIT=${GIT:-git} - -############################################################################### -printUsage() { - echo "Usage: $0 [options] patch-file | defect-number" - echo - echo "Where:" - echo " patch-file is a local patch file containing the changes to test" - echo " defect-number is a JIRA defect number (e.g. 'HADOOP-1234') to test (Jenkins only)" - echo - echo "Options:" - echo "--patch-dir= The directory for working and output files (default '/tmp')" - echo "--basedir= The directory to apply the patch to (default current directory)" - echo "--mvn-cmd= The 'mvn' command to use (default \$MAVEN_HOME/bin/mvn, or 'mvn')" - echo "--ps-cmd= The 'ps' command to use (default 'ps')" - echo "--awk-cmd= The 'awk' command to use (default 'awk')" - echo "--grep-cmd= The 'grep' command to use (default 'grep')" - echo "--patch-cmd= The 'patch' command to use (default 'patch')" - echo "--findbugs-home= Findbugs home directory (default FINDBUGS_HOME environment variable)" - echo "--forrest-home= Forrest home directory (default FORREST_HOME environment variable)" - echo "--dirty-workspace Allow the local workspace to have uncommitted changes" - echo "--git-cmd= The 'git' command to use (default 'git')" - echo - echo "Jenkins-only options:" - echo "--jenkins Run by Jenkins (runs tests and posts results to JIRA)" - echo "--wget-cmd= The 'wget' command to use (default 'wget')" - echo "--jira-cmd= The 'jira' command to use (default 'jira')" - echo "--jira-password= The password for the 'jira' command" - echo "--eclipse-home= Eclipse home directory (default ECLIPSE_HOME environment variable)" -} - -############################################################################### -parseArgs() { - for i in $* - do - case $i in - --jenkins) - JENKINS=true - ;; - --no-move-patch-dir) - MOVE_PATCH_DIR=false - ;; - --patch-dir=*) - PATCH_DIR=${i#*=} - ;; - --basedir=*) - BASEDIR=${i#*=} - ;; - --mvn-cmd=*) - MVN=${i#*=} - ;; - --ps-cmd=*) - PS=${i#*=} - ;; - --awk-cmd=*) - AWK=${i#*=} - ;; - --wget-cmd=*) - WGET=${i#*=} - ;; - --grep-cmd=*) - GREP=${i#*=} - ;; - --patch-cmd=*) - PATCH=${i#*=} - ;; - --jira-cmd=*) - JIRACLI=${i#*=} - ;; - --jira-password=*) - JIRA_PASSWD=${i#*=} - ;; - --findbugs-home=*) - FINDBUGS_HOME=${i#*=} - ;; - --forrest-home=*) - FORREST_HOME=${i#*=} - ;; - --eclipse-home=*) - ECLIPSE_HOME=${i#*=} - ;; - --dirty-workspace) - DIRTY_WORKSPACE=true - ;; - --git-cmd=*) - GIT=${i#*=} - ;; - *) - PATCH_OR_DEFECT=$i - ;; - esac - done - if [ -z "$PATCH_OR_DEFECT" ]; then - printUsage - exit 1 - fi - if [[ $JENKINS == "true" ]] ; then - echo "Running in Jenkins mode" - defect=$PATCH_OR_DEFECT - ECLIPSE_PROPERTY="-Declipse.home=$ECLIPSE_HOME" - else - echo "Running in developer mode" - JENKINS=false - ### PATCH_FILE contains the location of the patchfile - PATCH_FILE=$PATCH_OR_DEFECT - if [[ ! -e "$PATCH_FILE" ]] ; then - echo "Unable to locate the patch file $PATCH_FILE" - cleanupAndExit 0 - fi - ### Check if $PATCH_DIR exists. If it does not exist, create a new directory - if [[ ! -e "$PATCH_DIR" ]] ; then - mkdir "$PATCH_DIR" - if [[ $? == 0 ]] ; then - echo "$PATCH_DIR has been created" - else - echo "Unable to create $PATCH_DIR" - cleanupAndExit 0 - fi - fi - ### Obtain the patch filename to append it to the version number - defect=`basename $PATCH_FILE` - fi -} - -############################################################################### -checkout () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Testing patch for ${defect}." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - ### When run by a developer, if the workspace contains modifications, do not continue - ### unless the --dirty-workspace option was set - if [[ $JENKINS == "false" ]] ; then - if [[ -z $DIRTY_WORKSPACE ]] ; then - # Ref http://stackoverflow.com/a/2659808 for details on checking dirty status - ${GIT} diff-index --quiet HEAD - if [[ $? -ne 0 ]] ; then - uncommitted=`${GIT} diff --name-only HEAD` - uncommitted="You have the following files with uncommitted changes:${NEWLINE}${uncommitted}" - fi - untracked="$(${GIT} ls-files --exclude-standard --others)" && test -z "${untracked}" - if [[ $? -ne 0 ]] ; then - untracked="You have untracked and unignored files:${NEWLINE}${untracked}" - fi - if [[ $uncommitted || $untracked ]] ; then - echo "ERROR: can't run in a workspace that contains modifications." - echo "Pass the '--dirty-workspace' flag to bypass." - echo "" - echo "${uncommitted}" - echo "" - echo "${untracked}" - cleanupAndExit 1 - fi - fi - echo - fi - return $? -} - -findBranchNameFromPatchName() { - local patchName=$1 - for LOCAL_BRANCH_NAME in $BRANCH_NAMES; do - if [[ $patchName =~ /jira/secure/attachment/[0-9]*/.*$LOCAL_BRANCH_NAME ]]; then - BRANCH_NAME=$LOCAL_BRANCH_NAME - break - fi - done - return 0 -} - -checkoutBranch() { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Testing patch on branch ${BRANCH_NAME}." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - if [[ $JENKINS == "true" ]] ; then - if [[ "$BRANCH_NAME" != "master" ]]; then - echo "origin/${BRANCH_NAME} HEAD is commit `${GIT} rev-list origin/${BRANCH_NAME} -1`" - echo "${GIT} checkout -f `${GIT} rev-list origin/${BRANCH_NAME} -1`" - ${GIT} checkout -f `${GIT} rev-list origin/${BRANCH_NAME} -1` - echo "${GIT} status" - ${GIT} status - fi - fi -} - -############################################################################### -### Collect findbugs reports -collectFindbugsReports() { - name=$1 - basedir=$2 - patch_dir=$3 - for file in $(find $basedir -name findbugsXml.xml) - do - relative_file=${file#$basedir/} # strip leading $basedir prefix - if [ ! $relative_file == "target/findbugsXml.xml" ]; then - module_suffix=${relative_file%/target/findbugsXml.xml} # strip trailing path - module_suffix=`basename ${module_suffix}` - fi - - cp $file $patch_dir/${name}FindbugsWarnings${module_suffix}.xml - $FINDBUGS_HOME/bin/setBugDatabaseInfo -name $name \ - $patch_dir/${name}FindbugsWarnings${module_suffix}.xml \ - $patch_dir/${name}FindbugsWarnings${module_suffix}.xml - done - xml_file=$patch_dir/${name}FindbugsWarnings.xml - html_file=$patch_dir/${name}FindbugsWarnings.html - $FINDBUGS_HOME/bin/unionBugs -withMessages \ - -output $xml_file $patch_dir/${name}FindbugsWarnings*.xml - $FINDBUGS_HOME/bin/convertXmlToText -html $xml_file $html_file - file $xml_file $html_file -} - -############################################################################### -setup () { - ### Download latest patch file (ignoring .htm and .html) when run from patch process - if [[ $JENKINS == "true" ]] ; then - $WGET -q -O $PATCH_DIR/jira http://issues.apache.org/jira/browse/$defect - if [[ `$GREP -c 'Patch Available' $PATCH_DIR/jira` == 0 ]] ; then - echo "$defect is not \"Patch Available\". Exiting." - cleanupAndExit 0 - fi - relativePatchURL=`$GREP -o '"/jira/secure/attachment/[0-9]*/[^"]*' $PATCH_DIR/jira | $EGREP '(\.txt$|\.patch$|\.diff$)' | sort | tail -1 | $GREP -o '/jira/secure/attachment/[0-9]*/[^"]*'` - patchURL="http://issues.apache.org${relativePatchURL}" - patchNum=`echo $patchURL | $GREP -o '[0-9]*/' | $GREP -o '[0-9]*'` - # ensure attachment has not already been tested - ATTACHMENT_ID=$(basename $(dirname $patchURL)) - if grep -q "ATTACHMENT ID: $ATTACHMENT_ID" $PATCH_DIR/jira - then - echo "Attachment $ATTACHMENT_ID is already tested for $defect" - exit 1 - fi - echo "$defect patch is being downloaded at `date` from" - echo "$patchURL" - $WGET -q -O $PATCH_DIR/patch $patchURL - VERSION=${GIT_COMMIT}_${defect}_PATCH-${patchNum} - findBranchNameFromPatchName ${relativePatchURL} - checkoutBranch - JIRA_COMMENT="Here are the results of testing the latest attachment - $patchURL - against ${BRANCH_NAME} branch at commit ${GIT_COMMIT}. - ATTACHMENT ID: ${ATTACHMENT_ID}" - - ### Copy the patch file to $PATCH_DIR - else - VERSION=PATCH-${defect} - cp $PATCH_FILE $PATCH_DIR/patch - if [[ $? == 0 ]] ; then - echo "Patch file $PATCH_FILE copied to $PATCH_DIR" - else - echo "Could not copy $PATCH_FILE to $PATCH_DIR" - cleanupAndExit 0 - fi - fi - ### exit if warnings are NOT defined in the properties file - if [[ -z "$OK_JAVADOC_WARNINGS" ]] || [[ -z $OK_RELEASEAUDIT_WARNINGS ]] ; then - echo "Please define the following properties in test-patch.properties file" - echo "OK_RELEASEAUDIT_WARNINGS" - echo "OK_JAVADOC_WARNINGS" - cleanupAndExit 1 - fi - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Pre-build master to verify stability and javac warnings" - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - echo "$MVN clean package checkstyle:checkstyle-aggregate findbugs:findbugs -DskipTests \ - -D${PROJECT_NAME}PatchProcess > $PATCH_DIR/trunkJavacWarnings.txt 2>&1" - export MAVEN_OPTS="${MAVEN_OPTS}" - # build core and tests - $MVN clean package checkstyle:checkstyle-aggregate findbugs:findbugs -DskipTests \ - -D${PROJECT_NAME}PatchProcess > $PATCH_DIR/trunkJavacWarnings.txt 2>&1 - if [[ $? != 0 ]] ; then - echo "mvn exit code was $?" - ERR=`$GREP -A 5 'Compilation failure' $PATCH_DIR/trunkJavacWarnings.txt` - if [[ ${#ERR} -ge 1 ]] ; then - echo "Trunk compilation is broken? - {code}$ERR{code}" - cleanupAndExit 1 - fi - fi - mv target/checkstyle-result.xml $PATCH_DIR/trunkCheckstyle.xml - collectFindbugsReports trunk $BASEDIR $PATCH_DIR -} - -############################################################################### -### Check for @author tags in the patch -checkAuthor () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Checking there are no @author tags in the patch." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - authorTags=`$GREP -c -i '@author' $PATCH_DIR/patch` - echo "There appear to be $authorTags @author tags in the patch." - if [[ $authorTags != 0 ]] ; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 @author{color}. The patch appears to contain $authorTags @author tags which the Hadoop community has agreed to not allow in code contributions." - return 1 - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 @author{color}. The patch does not contain any @author tags." - return 0 -} - -############################################################################### -### Check for tests in the patch -checkTests () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Checking there are new or changed tests in the patch." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - testReferences=`$GREP -c -i '/test' $PATCH_DIR/patch` - echo "There appear to be $testReferences test files referenced in the patch." - if [[ $testReferences == 0 ]] ; then - if [[ $JENKINS == "true" ]] ; then - patchIsDoc=`$GREP -c -i 'title="documentation' $PATCH_DIR/jira` - if [[ $patchIsDoc != 0 ]] ; then - echo "The patch appears to be a documentation patch that doesn't require tests." - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+0 tests included{color}. The patch appears to be a documentation patch that doesn't require tests." - return 0 - fi - fi - srcReferences=`${GREP} "diff --git" "${PATCH_DIR}/patch" | ${GREP} "src/main" | \ - ${GREP} -v "src/main/asciidoc" | ${GREP} -v "src/main/site" -c` - if [[ $srcReferences == 0 ]] ; then - echo "The patch doesn't appear to alter any code that requires tests." - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+0 tests included{color}. The patch appears to be a documentation, build, - or dev-support patch that doesn't require tests." - return 0 - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 tests included{color}. The patch doesn't appear to include any new or modified tests. - Please justify why no new tests are needed for this patch. - Also please list what manual steps were performed to verify this patch." - return 1 - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 tests included{color}. The patch appears to include $testReferences new or modified tests." - return 0 -} - -############################################################################### -### Check there are no compilation errors, passing a file to be parsed. -checkCompilationErrors() { - local file=$1 - hadoopVersion="" - if [ "$#" -ne 1 ]; then - hadoopVersion="with Hadoop version $2" - fi - COMPILATION_ERROR=false - eval $(awk '/ERROR/ {print "COMPILATION_ERROR=true"}' $file) - if $COMPILATION_ERROR ; then - ERRORS=$($AWK '/ERROR/ { print $0 }' $file) - echo "======================================================================" - echo "There are compilation errors $hadoopVersion." - echo "======================================================================" - echo "$ERRORS" - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 javac{color}. The patch appears to cause mvn compile goal to fail $hadoopVersion. - - Compilation errors resume: - $ERRORS - " - submitJiraComment 1 - cleanupAndExit 1 - fi -} - -############################################################################### -### Check there are no protoc compilation errors, passing a file to be parsed. -checkProtocCompilationErrors() { - local file=$1 - COMPILATION_ERROR=false - eval $(awk '/\[ERROR/ {print "COMPILATION_ERROR=true"}' $file) - if $COMPILATION_ERROR ; then - ERRORS=$($AWK '/\[ERROR/ { print $0 }' $file) - echo "======================================================================" - echo "There are Protoc compilation errors." - echo "======================================================================" - echo "$ERRORS" - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 javac{color}. The patch appears to cause mvn compile-protobuf profile to fail. - - Protoc Compilation errors resume: - $ERRORS - " - cleanupAndExit 1 - fi -} - -############################################################################### -### Attempt to apply the patch -applyPatch () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Applying patch." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - - export PATCH - $BASEDIR/dev-support/smart-apply-patch.sh $PATCH_DIR/patch - if [[ $? != 0 ]] ; then - echo "PATCH APPLICATION FAILED" - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 patch{color}. The patch command could not apply the patch." - return 1 - fi - return 0 -} - -############################################################################### -### Check against known anti-patterns -checkAntiPatterns () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Checking against known anti-patterns." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - warnings=`$GREP 'new TreeMap $PATCH_DIR/patchJavadocWarnings.txt 2>&1" - export MAVEN_OPTS="${MAVEN_OPTS}" - $MVN clean package javadoc:javadoc -DskipTests -D${PROJECT_NAME}PatchProcess > $PATCH_DIR/patchJavadocWarnings.txt 2>&1 - javadocWarnings=`$GREP '\[WARNING\]' $PATCH_DIR/patchJavadocWarnings.txt | $AWK '/Javadoc Warnings/,EOF' | $GREP warning | $AWK 'BEGIN {total = 0} {total += 1} END {print total}'` - echo "" - echo "" - echo "There appear to be $javadocWarnings javadoc warnings generated by the patched build." - - ### if current warnings greater than OK_JAVADOC_WARNINGS - if [[ $javadocWarnings -gt $OK_JAVADOC_WARNINGS ]] ; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 javadoc{color}. The javadoc tool appears to have generated `expr $(($javadocWarnings-$OK_JAVADOC_WARNINGS))` warning messages." - # Add javadoc output url - JIRA_COMMENT_FOOTER="Javadoc warnings: $BUILD_URL/artifact/patchprocess/patchJavadocWarnings.txt -$JIRA_COMMENT_FOOTER" - return 1 - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 javadoc{color}. The javadoc tool did not generate any warning messages." - return 0 -} - -checkBuildWithHadoopVersions() { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Building with all supported Hadoop versions ." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - export MAVEN_OPTS="${MAVEN_OPTS}" - for HADOOP2_VERSION in $HADOOP2_VERSIONS ; do - echo "$MVN clean install -DskipTests -D${PROJECT_NAME}PatchProcess -Dhadoop-two.version=$HADOOP2_VERSION > $PATCH_DIR/patchJavacWithHadoop-$HADOOP2_VERSION.txt 2>&1" - $MVN clean install -DskipTests -D${PROJECT_NAME}PatchProcess -Dhadoop-two.version=$HADOOP2_VERSION > $PATCH_DIR/patchJavacWithHadoop-$HADOOP2_VERSION.txt 2>&1 - checkCompilationErrors $PATCH_DIR/patchJavacWithHadoop-$HADOOP2_VERSION.txt $HADOOP2_VERSION - done - - # TODO: add Hadoop3 versions and compilation here when we get the hadoop.profile=3.0 working - - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 hadoop versions{color}. The patch compiles with all supported hadoop versions ($HADOOP2_VERSIONS)" - return 0 -} - -############################################################################### -### Check there are no changes in the number of Javac warnings -checkJavacWarnings () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Determining number of patched javac warnings." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - echo "$MVN clean package -DskipTests -D${PROJECT_NAME}PatchProcess > $PATCH_DIR/patchJavacWarnings.txt 2>&1" - export MAVEN_OPTS="${MAVEN_OPTS}" - $MVN clean package -DskipTests -D${PROJECT_NAME}PatchProcess > $PATCH_DIR/patchJavacWarnings.txt 2>&1 - checkCompilationErrors $PATCH_DIR/patchJavacWarnings.txt - ### Compare trunk and patch javac warning numbers - if [[ -f $PATCH_DIR/patchJavacWarnings.txt ]] ; then - trunkJavacWarnings=`$GREP '\[WARNING\]' $PATCH_DIR/trunkJavacWarnings.txt | $AWK 'BEGIN {total = 0} {total += 1} END {print total}'` - patchJavacWarnings=`$GREP '\[WARNING\]' $PATCH_DIR/patchJavacWarnings.txt | $AWK 'BEGIN {total = 0} {total += 1} END {print total}'` - echo "There appear to be $trunkJavacWarnings javac compiler warnings before the patch and $patchJavacWarnings javac compiler warnings after applying the patch." - if [[ $patchJavacWarnings != "" && $trunkJavacWarnings != "" ]] ; then - if [[ $patchJavacWarnings -gt $trunkJavacWarnings ]] ; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 javac{color}. The applied patch generated $patchJavacWarnings javac compiler warnings (more than the master's current $trunkJavacWarnings warnings)." - return 1 - fi - fi - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings." - return 0 -} - -checkCheckstyleErrors() { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Determining number of patched Checkstyle errors." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - if [[ -f $PATCH_DIR/trunkCheckstyle.xml ]] ; then - $MVN package -DskipTests checkstyle:checkstyle-aggregate > /dev/null 2>&1 - mv target/checkstyle-result.xml $PATCH_DIR/patchCheckstyle.xml - mv target/site/checkstyle-aggregate.html $PATCH_DIR - mv target/site/checkstyle.css $PATCH_DIR - $BASEDIR/dev-support/checkstyle_report.py $PATCH_DIR/trunkCheckstyle.xml $PATCH_DIR/patchCheckstyle.xml - if [[ $? -eq 1 ]] ; then - JIRA_COMMENT_FOOTER="Checkstyle Errors: $BUILD_URL/artifact/patchprocess/checkstyle-aggregate.html - - $JIRA_COMMENT_FOOTER" - - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 checkstyle{color}. The applied patch generated new checkstyle errors. Check build console for list of new errors." - return 1 - fi - fi - JIRA_COMMENT_FOOTER="Checkstyle Errors: $BUILD_URL/artifact/patchprocess/checkstyle-aggregate.html - - $JIRA_COMMENT_FOOTER" - - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 checkstyle{color}. The applied patch does not generate new checkstyle errors." - return 0 - -} -############################################################################### -checkProtocErrors () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Determining whether there is patched protoc error." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - echo "$MVN clean install -DskipTests -Pcompile-protobuf -X -D${PROJECT_NAME}PatchProcess > $PATCH_DIR/patchProtocErrors.txt 2>&1" - export MAVEN_OPTS="${MAVEN_OPTS}" - $MVN clean install -DskipTests -Pcompile-protobuf -X -D${PROJECT_NAME}PatchProcess > $PATCH_DIR/patchProtocErrors.txt 2>&1 - checkProtocCompilationErrors $PATCH_DIR/patchProtocErrors.txt - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 protoc{color}. The applied patch does not increase the total number of protoc compiler warnings." - return 0 -} - -############################################################################### -### Check there are no changes in the number of release audit (RAT) warnings -checkReleaseAuditWarnings () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Determining number of patched release audit warnings." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - echo "$MVN apache-rat:check -D${PROJECT_NAME}PatchProcess 2>&1" - export MAVEN_OPTS="${MAVEN_OPTS}" - $MVN apache-rat:check -D${PROJECT_NAME}PatchProcess 2>&1 - find $BASEDIR -name rat.txt | xargs cat > $PATCH_DIR/patchReleaseAuditWarnings.txt - - ### Compare trunk and patch release audit warning numbers - if [[ -f $PATCH_DIR/patchReleaseAuditWarnings.txt ]] ; then - patchReleaseAuditWarnings=`$GREP -c '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt` - echo "" - echo "" - echo "There appear to be $OK_RELEASEAUDIT_WARNINGS release audit warnings before the patch and $patchReleaseAuditWarnings release audit warnings after applying the patch." - if [[ $patchReleaseAuditWarnings != "" && $OK_RELEASEAUDIT_WARNINGS != "" ]] ; then - if [[ $patchReleaseAuditWarnings -gt $OK_RELEASEAUDIT_WARNINGS ]] ; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 release audit{color}. The applied patch generated $patchReleaseAuditWarnings release audit warnings (more than the master's current $OK_RELEASEAUDIT_WARNINGS warnings)." - $GREP '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt > $PATCH_DIR/patchReleaseAuditProblems.txt - echo "Lines that start with ????? in the release audit report indicate files that do not have an Apache license header." >> $PATCH_DIR/patchReleaseAuditProblems.txt - JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/patchprocess/patchReleaseAuditWarnings.txt -$JIRA_COMMENT_FOOTER" - return 1 - fi - fi - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings." - return 0 -} - -############################################################################### -### Check there are no changes in the number of Findbugs warnings -checkFindbugsWarnings () { - findbugs_version=`${FINDBUGS_HOME}/bin/findbugs -version` - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Determining number of patched Findbugs warnings." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - echo "$MVN clean package findbugs:findbugs -D${PROJECT_NAME}PatchProcess" - export MAVEN_OPTS="${MAVEN_OPTS}" - $MVN clean package findbugs:findbugs -D${PROJECT_NAME}PatchProcess -DskipTests < /dev/null - - if [ $? != 0 ] ; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 findbugs{color}. The patch appears to cause Findbugs (version ${findbugs_version}) to fail." - return 1 - fi - - collectFindbugsReports patch $BASEDIR $PATCH_DIR - #this files are generated by collectFindbugsReports() named with its first argument - patch_xml=$PATCH_DIR/patchFindbugsWarnings.xml - trunk_xml=$PATCH_DIR/trunkFindbugsWarnings.xml - # combine them to one database - combined_xml=$PATCH_DIR/combinedFindbugsWarnings.xml - new_xml=$PATCH_DIR/newFindbugsWarnings.xml - new_html=$PATCH_DIR/newFindbugsWarnings.html - $FINDBUGS_HOME/bin/computeBugHistory -useAnalysisTimes -withMessages \ - -output $combined_xml $trunk_xml $patch_xml - findbugsWarnings=$($FINDBUGS_HOME/bin/filterBugs -first patch $combined_xml $new_xml) - findbugsFixedWarnings=$($FINDBUGS_HOME/bin/filterBugs -fixed patch $combined_xml $new_xml) - $FINDBUGS_HOME/bin/convertXmlToText -html $new_xml $new_html - file $new_xml $new_html - JIRA_COMMENT_FOOTER="Release Findbugs (version ${findbugs_version}) \ - warnings: $BUILD_URL/artifact/patchprocess/newFindbugsWarnings.html -$JIRA_COMMENT_FOOTER" - ### if current warnings greater than 0, fail - if [[ $findbugsWarnings -gt 0 ]] ; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 findbugs{color}. The patch appears to introduce $findbugsWarnings \ - new Findbugs (version ${findbugs_version}) warnings." - return 1 - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 findbugs{color}. The patch does not introduce any \ - new Findbugs (version ${findbugs_version}) warnings." - return 0 -} - -############################################################################### -### Check line lengths -checkLineLengths () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Checking that no line have length > $MAX_LINE_LENGTH" - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - #see http://en.wikipedia.org/wiki/Diff#Unified_format - - MAX_LINE_LENGTH_PATCH=`expr $MAX_LINE_LENGTH + 1` - lines=`cat $PATCH_DIR/patch | grep "^+" | grep -v "^@@" | grep -v "^+++" | grep -v "import" | grep -v "org.apache.thrift." | grep -v "com.google.protobuf." | grep -v "protobuf.generated" | awk -v len="$MAX_LINE_LENGTH_PATCH" 'length ($0) > len' | head -n 10` - ll=`echo "$lines" | wc -l` - if [[ "$ll" -gt "1" ]]; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 lineLengths{color}. The patch introduces the following lines longer than $MAX_LINE_LENGTH: - $lines" - - return 1 - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 lineLengths{color}. The patch does not introduce lines longer than $MAX_LINE_LENGTH" - return 0 -} - -############################################################################### -### Run the tests -runTests () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Running tests." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - failed_tests="" - echo "$MVN clean test -Dsurefire.rerunFailingTestsCount=2 -P runAllTests -D${PROJECT_NAME}PatchProcess" - export MAVEN_OPTS="${MAVEN_OPTS}" - ulimit -a - # Need to export this so the zombie subshell picks up current content - export JIRA_COMMENT - $MVN clean test -Dsurefire.rerunFailingTestsCount=2 -P runAllTests -D${PROJECT_NAME}PatchProcess - if [[ $? != 0 ]] ; then - ### Find and format names of failed tests - failed_tests=`find . -name 'TEST*.xml' | xargs $GREP -l -E " $PATCH_DIR/patchSiteOutput.txt 2>&1" - export MAVEN_OPTS="${MAVEN_OPTS}" - $MVN package post-site -DskipTests -D${PROJECT_NAME}PatchProcess > $PATCH_DIR/patchSiteOutput.txt 2>&1 - if [[ $? != 0 ]] ; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 site{color}. The patch appears to cause mvn post-site goal to fail." - return 1 - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 site{color}. The mvn post-site goal succeeds with this patch." - return 0 -} - -############################################################################### -### Run the inject-system-faults target -checkInjectSystemFaults () { - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Checking the integrity of system test framework code." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - - ### Kill any rogue build processes from the last attempt - $PS auxwww | $GREP ${PROJECT_NAME}PatchProcess | $AWK '{print $2}' | /usr/bin/xargs -t -I {} /bin/kill -9 {} > /dev/null - - #echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -DHadoopPatchProcess= -Dtest.junit.output.format=xml -Dtest.output=no -Dcompile.c++=yes -Dforrest.home=$FORREST_HOME inject-system-faults" - #$ANT_HOME/bin/ant -Dversion="${VERSION}" -DHadoopPatchProcess= -Dtest.junit.output.format=xml -Dtest.output=no -Dcompile.c++=yes -Dforrest.home=$FORREST_HOME inject-system-faults - echo "NOP" - return 0 - if [[ $? != 0 ]] ; then - JIRA_COMMENT="$JIRA_COMMENT - - {color:red}-1 system test framework{color}. The patch failed system test framework compile." - return 1 - fi - JIRA_COMMENT="$JIRA_COMMENT - - {color:green}+1 system test framework{color}. The patch passed system test framework compile." - return 0 -} - -############################################################################### -### Submit a comment to the defect's Jira -submitJiraComment () { - local result=$1 - ### Do not output the value of JIRA_COMMENT_FOOTER when run by a developer - if [[ $JENKINS == "false" ]] ; then - JIRA_COMMENT_FOOTER="" - fi - if [[ $result == 0 ]] ; then - comment="{color:green}+1 overall{color}. $JIRA_COMMENT - -$JIRA_COMMENT_FOOTER" - else - comment="{color:red}-1 overall{color}. $JIRA_COMMENT - -$JIRA_COMMENT_FOOTER" - fi - ### Output the test result to the console - echo " - - - -$comment" - - if [[ $JENKINS == "true" ]] ; then - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Adding comment to Jira." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - ### Update Jira with a comment - export USER=hudson - $JIRACLI -s https://issues.apache.org/jira -a addcomment -u hadoopqa -p $JIRA_PASSWD --comment "$comment" --issue $defect - $JIRACLI -s https://issues.apache.org/jira -a logout -u hadoopqa -p $JIRA_PASSWD - fi -} - -############################################################################### -### Cleanup files -cleanupAndExit () { - local result=$1 - if [[ ${JENKINS} == "true" && ${MOVE_PATCH_DIR} == "true" ]] ; then - if [ -e "$PATCH_DIR" ] ; then - echo "Relocating patch dir into ${BASEDIR}" - mv $PATCH_DIR $BASEDIR - fi - fi - echo "" - echo "" - echo "======================================================================" - echo "======================================================================" - echo " Finished build." - echo "======================================================================" - echo "======================================================================" - echo "" - echo "" - exit $result -} - -############################################################################### -############################################################################### -############################################################################### - -JIRA_COMMENT="" -JIRA_COMMENT_FOOTER="Console output: $BUILD_URL/console - -This message is automatically generated." - -### Check if arguments to the script have been specified properly or not -parseArgs $@ -cd $BASEDIR - -echo "Version of this script: Wed Oct 14 00:29:04 PDT 2015" -checkout -RESULT=$? -echo "RESULT = " $RESULT -if [[ $JENKINS == "true" ]] ; then - if [[ $RESULT != 0 ]] ; then - exit 100 - fi -fi -setup -checkAuthor -RESULT=$? -echo "RESULT = " $RESULT -checkTests -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -applyPatch -if [[ $? != 0 ]] ; then - submitJiraComment 1 - cleanupAndExit 1 -fi - -checkAntiPatterns -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkBuildWithHadoopVersions -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkJavacWarnings -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkProtocErrors -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkJavadocWarnings -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkCheckstyleErrors -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkInterfaceAudience -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkFindbugsWarnings -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkReleaseAuditWarnings -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkLineLengths -(( RESULT = RESULT + $? )) -echo "RESULT = " $RESULT -checkSiteXml -(( RESULT = RESULT + $?)) -echo "RESULT = " $RESULT -### Do not call these when run by a developer -if [[ $JENKINS == "true" ]] ; then - runTests - (( RESULT = RESULT + $? )) - echo "RESULT = " $RESULT -JIRA_COMMENT_FOOTER="Test results: $BUILD_URL/testReport/ -$JIRA_COMMENT_FOOTER" -fi -submitJiraComment $RESULT -cleanupAndExit $RESULT diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 1bd4e07..0fb0455 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ColumnFamilySchema; @@ -185,13 +187,14 @@ public class HTableDescriptor implements Comparable { /** * INTERNAL Used by shell/rest interface to access this metadata - * attribute which denotes if the table should be treated by region normalizer. + * attribute which denotes the allowed types of action (split/merge) when the table is treated + * by region normalizer. * - * @see #isNormalizationEnabled() + * @see #getDesiredNormalizationTypes() */ - public static final String NORMALIZATION_ENABLED = "NORMALIZATION_ENABLED"; - private static final Bytes NORMALIZATION_ENABLED_KEY = - new Bytes(Bytes.toBytes(NORMALIZATION_ENABLED)); + public static final String NORMALIZATION_MODE = "NORMALIZATION_MODE"; + private static final Bytes NORMALIZATION_MODE_KEY = + new Bytes(Bytes.toBytes(NORMALIZATION_MODE)); /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */ private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; @@ -220,11 +223,6 @@ public class HTableDescriptor implements Comparable { public static final boolean DEFAULT_COMPACTION_ENABLED = true; /** - * Constant that denotes whether the table is normalized by default. - */ - public static final boolean DEFAULT_NORMALIZATION_ENABLED = false; - - /** * Constant that denotes the maximum default size of the memstore after which * the contents are flushed to the store files */ @@ -249,7 +247,7 @@ public class HTableDescriptor implements Comparable { String.valueOf(DEFAULT_DEFERRED_LOG_FLUSH)); DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION)); - DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED)); + DEFAULT_VALUES.put(NORMALIZATION_MODE, ""); for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))); } @@ -640,22 +638,42 @@ public class HTableDescriptor implements Comparable { } /** - * Check if normalization enable flag of the table is true. If flag is - * false then no region normalizer won't attempt to normalize this table. + * Check if normalization flag of the table. If flag is + * empty then region normalizer won't attempt to normalize this table. * - * @return true if region normalization is enabled for this table + * @return List of PlanType if region normalization is enabled for this table + * null means region normalization is disabled */ - public boolean isNormalizationEnabled() { - return isSomething(NORMALIZATION_ENABLED_KEY, DEFAULT_NORMALIZATION_ENABLED); + public List getDesiredNormalizationTypes() { + byte [] value = getValue(NORMALIZATION_MODE_KEY); + if (value == null) { + return null; + } + String strValue = Bytes.toString(value); + if (strValue.isEmpty()) { + return null; + } + List types = new ArrayList<>(); + if (strValue.toUpperCase().contains("M")) { + types.add(PlanType.MERGE); + } + if (strValue.toUpperCase().contains("S")) { + types.add(PlanType.SPLIT); + } + return types; } /** - * Setting the table normalization enable flag. + * Setting the types of action for table normalization mode flag. * - * @param isEnable True if enable normalization. - */ - public HTableDescriptor setNormalizationEnabled(final boolean isEnable) { - setValue(NORMALIZATION_ENABLED_KEY, isEnable ? TRUE : FALSE); + * @param types String containing desired types of action: + * "M" for region merge + * "S" for region split + * "MS" for region merge / split + */ + public HTableDescriptor setNormalizationMode(final String types) { + setValue(NORMALIZATION_MODE_KEY, types == null || types.isEmpty() ? null : + new Bytes(Bytes.toBytes(types.toUpperCase()))); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index fe9745e..b910265 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -271,6 +271,7 @@ public class Scan extends Query { this.getScan = true; this.asyncPrefetch = false; this.consistency = get.getConsistency(); + this.setIsolationLevel(get.getIsolationLevel()); for (Map.Entry attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index ac76edb..a7759c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -265,7 +265,14 @@ public enum EventType { * * RS_REGION_REPLICA_FLUSH */ - RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS); + RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS), + + /** + * RS compacted files discharger
+ * + * RS_COMPACTED_FILES_DISCHARGER + */ + RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER); private final int code; private final ExecutorType executor; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index d0f6bee..5a16149 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -46,7 +46,8 @@ public enum ExecutorType { RS_CLOSE_META (25), RS_PARALLEL_SEEK (26), RS_LOG_REPLAY_OPS (27), - RS_REGION_REPLICA_FLUSH_OPS (28); + RS_REGION_REPLICA_FLUSH_OPS (28), + RS_COMPACTED_FILES_DISCHARGER (29); ExecutorType(int value) {} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/normalizer/NormalizationPlan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/normalizer/NormalizationPlan.java new file mode 100644 index 0000000..66481e6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/normalizer/NormalizationPlan.java @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.normalizer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; + +/** + * Interface for normalization plan. + */ +@InterfaceAudience.Private +public interface NormalizationPlan { + enum PlanType { + SPLIT, + MERGE, + NONE + } + + /** + * Executes normalization plan on cluster (does actual splitting/merging work). + * @param admin instance of Admin + */ + void execute(Admin admin); + + /** + * @return the type of this plan + */ + PlanType getType(); +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java index 4b9f113..c38340d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java @@ -30,7 +30,7 @@ import org.junit.experimental.categories.Category; @Category({ClientTests.class, SmallTests.class}) public class TestIncrement { @Test - public void test() { + public void testIncrementInstance() { final long expected = 13; Increment inc = new Increment(new byte [] {'r'}); int total = 0; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index 1d55baa..c6698f5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -75,4 +75,4 @@ public interface Tag { * @return The {@link java.nio.ByteBuffer} containing the value bytes. */ ByteBuffer getValueByteBuffer(); -} +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java index 15ddfc8..f59680e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -104,7 +105,7 @@ public final class TagUtil { * @return the serialized tag data as bytes */ public static byte[] fromList(List tags) { - if (tags.isEmpty()) { + if (tags == null || tags.isEmpty()) { return HConstants.EMPTY_BYTE_ARRAY; } int length = 0; @@ -216,4 +217,28 @@ public final class TagUtil { } return StreamUtils.readRawVarint32(tag.getValueByteBuffer(), offset); } -} \ No newline at end of file + + /** + * @return A List of any Tags found in cell else null. + */ + public static List carryForwardTags(final Cell cell) { + return carryForwardTags(null, cell); + } + + /** + * @return Add to tagsOrNull any Tags cell is carrying or null if + * it is carrying no Tags AND the passed in tagsOrNull is null (else we return new + * List with Tags found). + */ + public static List carryForwardTags(final List tagsOrNull, final Cell cell) { + List tags = tagsOrNull; + if (cell.getTagsLength() <= 0) return tags; + Iterator itr = + CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + if (tags == null) tags = new ArrayList(); + while (itr.hasNext()) { + tags.add(itr.next()); + } + return tags; + } +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterSource.java index ab621cc..290b8f5 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterSource.java @@ -57,6 +57,8 @@ public interface MetricsMasterSource extends BaseSource { String SERVER_NAME_NAME = "serverName"; String CLUSTER_ID_NAME = "clusterId"; String IS_ACTIVE_MASTER_NAME = "isActiveMaster"; + String SPLIT_PLAN_COUNT_NAME = "splitPlanCount"; + String MERGE_PLAN_COUNT_NAME = "mergePlanCount"; String CLUSTER_REQUESTS_NAME = "clusterRequests"; String MASTER_ACTIVE_TIME_DESC = "Master Active Time"; @@ -70,7 +72,8 @@ public interface MetricsMasterSource extends BaseSource { String SERVER_NAME_DESC = "Server Name"; String CLUSTER_ID_DESC = "Cluster Id"; String IS_ACTIVE_MASTER_DESC = "Is Active Master"; - + String SPLIT_PLAN_COUNT_DESC = "Number of Region Split Plans executed"; + String MERGE_PLAN_COUNT_DESC = "Number of Region Merge Plans executed"; /** diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapper.java index 678db69..5e67f83 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapper.java @@ -112,4 +112,13 @@ public interface MetricsMasterWrapper { */ long getNumWALFiles(); + /** + * Get the number of region split plans executed. + */ + long getSplitPlanCount(); + + /** + * Get the number of region merge plans executed. + */ + long getMergePlanCount(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterSourceImpl.java index c5ce5e4..b0ba66e 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterSourceImpl.java @@ -74,6 +74,10 @@ public class MetricsMasterSourceImpl // masterWrapper can be null because this function is called inside of init. if (masterWrapper != null) { metricsRecordBuilder + .addGauge(Interns.info(MERGE_PLAN_COUNT_NAME, MERGE_PLAN_COUNT_DESC), + masterWrapper.getMergePlanCount()) + .addGauge(Interns.info(SPLIT_PLAN_COUNT_NAME, SPLIT_PLAN_COUNT_DESC), + masterWrapper.getSplitPlanCount()) .addGauge(Interns.info(MASTER_ACTIVE_TIME_NAME, MASTER_ACTIVE_TIME_DESC), masterWrapper.getActiveTime()) .addGauge(Interns.info(MASTER_START_TIME_NAME, diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index 62f5c66..6f3baa0 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -433,6 +433,8 @@ public class DistributedHBaseCluster extends HBaseCluster { Set toKill = new TreeSet(new ServerNameIgnoreStartCodeComparator()); toStart.addAll(initial.getServers()); toKill.addAll(current.getServers()); + + ServerName master = initial.getMaster(); for (ServerName server : current.getServers()) { toStart.remove(server); @@ -447,7 +449,8 @@ public class DistributedHBaseCluster extends HBaseCluster { try { if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), - sn.getPort())) { + sn.getPort()) + && master.getPort() != sn.getPort()) { LOG.info("Restoring cluster - starting initial region server: " + sn.getHostAndPort()); startRegionServer(sn.getHostname(), sn.getPort()); } @@ -460,7 +463,8 @@ public class DistributedHBaseCluster extends HBaseCluster { try { if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), - sn.getPort())) { + sn.getPort()) + && master.getPort() != sn.getPort()){ LOG.info("Restoring cluster - stopping initial region server: " + sn.getHostAndPort()); stopRegionServer(sn); } diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon index 30a95ce..2bf034a 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon @@ -91,7 +91,12 @@ if (toRemove > 0) { entry.getValue(), conf) %> <% (currentTime - entry.getValue().getStamp()) %> - Total number of Regions in Transition for more than <% ritThreshold %> milliseconds <% numOfRITOverThreshold %> + <%if numOfRITOverThreshold > 0 %> + + <%else> + + + Total number of Regions in Transition for more than <% ritThreshold %> milliseconds <% numOfRITOverThreshold %> Total number of Regions in Transition<% totalRITs %> diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 410fb39..335b672 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -78,7 +79,8 @@ public class ExecutorService { * started with the same name, this throws a RuntimeException. * @param name Name of the service to start. */ - void startExecutorService(String name, int maxThreads) { + @VisibleForTesting + public void startExecutorService(String name, int maxThreads) { if (this.executorMap.get(name) != null) { throw new RuntimeException("An executor service with the name " + name + " is already running!"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 4feb2e7..c319bb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.quotas.QuotaExceededException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2e42acb..2431681 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -92,7 +92,6 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory; @@ -114,6 +113,8 @@ import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; @@ -332,6 +333,9 @@ public class HMaster extends HRegionServer implements MasterServices { // handle table states private TableStateManager tableStateManager; + + private long splitPlanCount; + private long mergePlanCount; /** flag used in test cases in order to simulate RS failures during master initialization */ private volatile boolean initializationBeforeMetaAssignment = false; @@ -1323,15 +1327,31 @@ public class HMaster extends HRegionServer implements MasterServices { for (TableName table : allEnabledTables) { TableDescriptor tblDesc = getTableDescriptors().getDescriptor(table); - if (table.isSystemTable() || (tblDesc != null && - tblDesc.getHTableDescriptor() != null && - !tblDesc.getHTableDescriptor().isNormalizationEnabled())) { - LOG.debug("Skipping normalization for table: " + table + ", as it's either system" - + " table or doesn't have auto normalization turned on"); + if (table.isSystemTable()) { + LOG.debug("Skipping normalization for table: " + table + ", as it's system table"); continue; } - NormalizationPlan plan = this.normalizer.computePlanForTable(table); - plan.execute(clusterConnection.getAdmin()); + List types = null; + if (tblDesc != null && + tblDesc.getHTableDescriptor() != null) { + types = tblDesc.getHTableDescriptor().getDesiredNormalizationTypes(); + if (types == null) { + LOG.debug("Skipping normalization for table: " + table + ", as it" + + " doesn't have auto normalization turned on"); + continue; + } + } + List plans = this.normalizer.computePlanForTable(table, types); + if (plans != null) { + for (NormalizationPlan plan : plans) { + plan.execute(clusterConnection.getAdmin()); + if (plan.getType() == PlanType.SPLIT) { + splitPlanCount++; + } else if (plan.getType() == PlanType.MERGE) { + mergePlanCount++; + } + } + } } } // If Region did not generate any plans, it means the cluster is already balanced. @@ -2327,6 +2347,20 @@ public class HMaster extends HRegionServer implements MasterServices { } return regionStates.getAverageLoad(); } + + /* + * @return the count of region split plans executed + */ + public long getSplitPlanCount() { + return splitPlanCount; + } + + /* + * @return the count of region merge plans executed + */ + public long getMergePlanCount() { + return mergePlanCount; + } @Override public boolean registerService(Service instance) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java index a935a37..4cff28b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java @@ -40,6 +40,16 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper { } @Override + public long getSplitPlanCount() { + return master.getSplitPlanCount(); + } + + @Override + public long getMergePlanCount() { + return master.getMergePlanCount(); + } + + @Override public String getClusterId() { return master.getClusterId(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/EmptyNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/EmptyNormalizationPlan.java index 5aecc48..29cc0c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/EmptyNormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/EmptyNormalizationPlan.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master.normalizer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan; /** * Plan which signifies that no normalization is required, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java index e2035bb..f3ce1d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan; import java.io.IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java deleted file mode 100644 index 9f866d3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.normalizer; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; - -/** - * Interface for normalization plan. - */ -@InterfaceAudience.Private -public interface NormalizationPlan { - enum PlanType { - SPLIT, - MERGE, - NONE - } - - /** - * Executes normalization plan on cluster (does actual splitting/merging work). - * @param admin instance of Admin - */ - void execute(Admin admin); - - /** - * @return the type of this plan - */ - PlanType getType(); -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java index d60474d..c0083e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java @@ -18,12 +18,15 @@ */ package org.apache.hadoop.hbase.master.normalizer; +import java.util.List; + import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType; /** * Performs "normalization" of regions on the cluster, making sure that suboptimal @@ -47,9 +50,11 @@ public interface RegionNormalizer { /** * Computes next optimal normalization plan. * @param table table to normalize - * @return Next (perhaps most urgent) normalization action to perform + * @param types desired types of NormalizationPlan + * @return normalization actions to perform. Null if no action to take */ - NormalizationPlan computePlanForTable(TableName table) throws HBaseIOException; + List computePlanForTable(TableName table, List types) + throws HBaseIOException; /** * Notification for the case where plan couldn't be executed due to constraint violation, such as diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java index 25118c7..c6baa2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java @@ -38,7 +38,7 @@ public class RegionNormalizerChore extends ScheduledChore { public RegionNormalizerChore(HMaster master) { super(master.getServerName() + "-RegionNormalizerChore", master, - master.getConfiguration().getInt("hbase.normalizer.period", 1800000)); + master.getConfiguration().getInt("hbase.normalizer.period", 300000)); this.master = master; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java index fe10bd1..7ea6cc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java @@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; -import org.apache.hadoop.hbase.util.Triple; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType; import java.util.ArrayList; import java.util.Collections; @@ -60,7 +60,7 @@ public class SimpleRegionNormalizer implements RegionNormalizer { private static final Log LOG = LogFactory.getLog(SimpleRegionNormalizer.class); private static final int MIN_REGION_COUNT = 3; private MasterServices masterServices; - private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length]; + private static long[] skippedCount = new long[PlanType.values().length]; /** * Set the master service. @@ -71,22 +71,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer { this.masterServices = masterServices; } - /* - * This comparator compares the region size. - * The second element in the triple is region size while the 3rd element - * is the index of the region in the underlying List - */ - private Comparator> regionSizeComparator = - new Comparator>() { - @Override - public int compare(Triple pair, - Triple pair2) { - long sz = pair.getSecond(); - long sz2 = pair2.getSecond(); - return (sz < sz2) ? -1 : ((sz == sz2) ? 0 : 1); - } - }; - @Override public void planSkipped(HRegionInfo hri, PlanType type) { skippedCount[type.ordinal()]++; @@ -97,20 +81,38 @@ public class SimpleRegionNormalizer implements RegionNormalizer { return skippedCount[type.ordinal()]; } + // Comparator that gives higher priority to region Split plan + private Comparator planComparator = + new Comparator() { + @Override + public int compare(NormalizationPlan plan, NormalizationPlan plan2) { + if (plan instanceof SplitNormalizationPlan) { + return -1; + } + if (plan2 instanceof SplitNormalizationPlan) { + return 1; + } + return 0; + } + }; + /** * Computes next most "urgent" normalization action on the table. * Action may be either a split, or a merge, or no action. * * @param table table to normalize + * @param types desired types of NormalizationPlan * @return normalization plan to execute */ @Override - public NormalizationPlan computePlanForTable(TableName table) throws HBaseIOException { + public List computePlanForTable(TableName table, List types) + throws HBaseIOException { if (table == null || table.isSystemTable()) { LOG.debug("Normalization of system table " + table + " isn't allowed"); - return EmptyNormalizationPlan.getInstance(); + return null; } + List plans = new ArrayList(); List tableRegions = masterServices.getAssignmentManager().getRegionStates(). getRegionsOfTable(table); @@ -119,7 +121,7 @@ public class SimpleRegionNormalizer implements RegionNormalizer { int nrRegions = tableRegions == null ? 0 : tableRegions.size(); LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number" + " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer"); - return EmptyNormalizationPlan.getInstance(); + return null; } LOG.debug("Computing normalization plan for table: " + table + @@ -127,55 +129,49 @@ public class SimpleRegionNormalizer implements RegionNormalizer { long totalSizeMb = 0; - ArrayList> regionsWithSize = - new ArrayList>(tableRegions.size()); for (int i = 0; i < tableRegions.size(); i++) { HRegionInfo hri = tableRegions.get(i); long regionSize = getRegionSize(hri); - regionsWithSize.add(new Triple(hri, regionSize, i)); totalSizeMb += regionSize; } - Collections.sort(regionsWithSize, regionSizeComparator); - - Triple largestRegion = regionsWithSize.get(tableRegions.size()-1); double avgRegionSize = totalSizeMb / (double) tableRegions.size(); LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb); LOG.debug("Table " + table + ", average region size: " + avgRegionSize); - // now; if the largest region is >2 times large than average, we split it, split - // is more high priority normalization action than merge. - if (largestRegion.getSecond() > 2 * avgRegionSize) { - LOG.debug("Table " + table + ", largest region " - + largestRegion.getFirst().getRegionNameAsString() + " has size " - + largestRegion.getSecond() + ", more than 2 times than avg size, splitting"); - return new SplitNormalizationPlan(largestRegion.getFirst(), null); - } int candidateIdx = 0; - // look for two successive entries whose indices are adjacent - while (candidateIdx < tableRegions.size()-1) { - if (Math.abs(regionsWithSize.get(candidateIdx).getThird() - - regionsWithSize.get(candidateIdx + 1).getThird()) == 1) { - break; + while (candidateIdx < tableRegions.size()) { + HRegionInfo hri = tableRegions.get(candidateIdx); + long regionSize = getRegionSize(hri); + // if the region is > 2 times larger than average, we split it, split + // is more high priority normalization action than merge. + if (types.contains(PlanType.SPLIT) && regionSize > 2 * avgRegionSize) { + LOG.debug("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size " + + regionSize + ", more than twice avg size, splitting"); + plans.add(new SplitNormalizationPlan(hri, null)); + } else { + if (candidateIdx == tableRegions.size()-1) { + break; + } + HRegionInfo hri2 = tableRegions.get(candidateIdx+1); + long regionSize2 = getRegionSize(hri2); + if (types.contains(PlanType.MERGE) && regionSize + regionSize2 < avgRegionSize) { + LOG.debug("Table " + table + ", small region size: " + regionSize + + " plus its neighbor size: " + regionSize2 + + ", less than the avg size " + avgRegionSize + ", merging them"); + plans.add(new MergeNormalizationPlan(hri, hri2)); + candidateIdx++; + } } candidateIdx++; } - if (candidateIdx == tableRegions.size()-1) { - LOG.debug("No neighboring regions found for table: " + table); - return EmptyNormalizationPlan.getInstance(); - } - Triple candidateRegion = regionsWithSize.get(candidateIdx); - Triple candidateRegion2 = regionsWithSize.get(candidateIdx+1); - if (candidateRegion.getSecond() + candidateRegion2.getSecond() < avgRegionSize) { - LOG.debug("Table " + table + ", smallest region size: " + candidateRegion.getSecond() - + " and its smallest neighbor size: " + candidateRegion2.getSecond() - + ", less than the avg size, merging them"); - return new MergeNormalizationPlan(candidateRegion.getFirst(), - candidateRegion2.getFirst()); + if (plans.isEmpty()) { + LOG.debug("No normalization needed, regions look good for table: " + table); + return null; } - LOG.debug("No normalization needed, regions look good for table: " + table); - return EmptyNormalizationPlan.getInstance(); + Collections.sort(plans, planComparator); + return plans; } private long getRegionSize(HRegionInfo hri) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java index b95bfb7..76b7cc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan; import java.io.IOException; import java.util.Arrays; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java new file mode 100644 index 0000000..02160d8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HStore; + +/** + * Event handler that handles the removal and archival of the compacted hfiles + */ +@InterfaceAudience.Private +public class CompactedHFilesDischargeHandler extends EventHandler { + + private HStore store; + + public CompactedHFilesDischargeHandler(Server server, EventType eventType, HStore store) { + super(server, eventType); + this.store = store; + } + + @Override + public void process() throws IOException { + this.store.closeAndArchiveCompactedFiles(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java new file mode 100644 index 0000000..c4974cf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Store; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A chore service that periodically cleans up the compacted files when there are no active readers + * using those compacted files and also helps in clearing the block cache with these compacted + * file entries + */ +@InterfaceAudience.Private +public class CompactedHFilesDischarger extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class); + private RegionServerServices regionServerServices; + // Default is to use executor + @VisibleForTesting + private boolean useExecutor = true; + + /** + * @param period the period of time to sleep between each run + * @param stopper the stopper + * @param regionServerServices the region server that starts this chore + */ + public CompactedHFilesDischarger(final int period, final Stoppable stopper, + final RegionServerServices regionServerServices) { + // Need to add the config classes + super("CompactedHFilesCleaner", stopper, period); + this.regionServerServices = regionServerServices; + } + + /** + * @param period the period of time to sleep between each run + * @param stopper the stopper + * @param regionServerServices the region server that starts this chore + * @param useExecutor true if to use the region server's executor service, false otherwise + */ + @VisibleForTesting + public CompactedHFilesDischarger(final int period, final Stoppable stopper, + final RegionServerServices regionServerServices, boolean useExecutor) { + // Need to add the config classes + this(period, stopper, regionServerServices); + this.useExecutor = useExecutor; + } + + @Override + public void chore() { + List onlineRegions = regionServerServices.getOnlineRegions(); + if (onlineRegions != null) { + for (Region region : onlineRegions) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Started the compacted hfiles cleaner for the region " + region.getRegionInfo()); + } + for (Store store : region.getStores()) { + try { + if (useExecutor && regionServerServices != null) { + CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler( + (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER, + (HStore) store); + regionServerServices.getExecutorService().submit(handler); + } else { + // call synchronously if the RegionServerServices are not + // available + store.closeAndArchiveCompactedFiles(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Completed archiving the compacted files for the region " + + region.getRegionInfo() + " under the store " + store.getColumnFamilyName()); + } + } catch (Exception e) { + LOG.error("Exception while trying to close and archive the comapcted store " + + "files of the store " + store.getColumnFamilyName() + " in the" + " region " + + region.getRegionInfo(), e); + } + } + if (LOG.isTraceEnabled()) { + LOG.trace( + "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo()); + } + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e553fcc..36ab97a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -74,7 +75,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; @@ -151,7 +151,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; -import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; @@ -814,20 +813,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Initialize all the HStores status.setStatus("Initializing all the Stores"); long maxSeqId = initializeStores(reporter, status); - // Start the CompactedHFilesDischarger here. This chore helps to remove the compacted files - // that will no longer be used in reads. - if (this.getRegionServerServices() != null) { - ChoreService choreService = this.getRegionServerServices().getChoreService(); - if (choreService != null) { - // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to - // 2 mins so that compacted files can be archived before the TTLCleaner runs - int cleanerInterval = - conf.getInt("hbase.hfile.compactions.cleaner.interval", 2 * 60 * 1000); - this.compactedFileDischarger = - new CompactedHFilesDischarger(cleanerInterval, this.getRegionServerServices(), this); - choreService.scheduleChore(compactedFileDischarger); - } - } this.mvcc.advanceTo(maxSeqId); if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { // Recover any edits if available. @@ -1819,27 +1804,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * We are trying to remove / relax the region read lock for compaction. * Let's see what are the potential race conditions among the operations (user scan, * region split, region close and region bulk load). - * + * * user scan ---> region read lock * region split --> region close first --> region write lock * region close --> region write lock * region bulk load --> region write lock - * + * * read lock is compatible with read lock. ---> no problem with user scan/read * region bulk load does not cause problem for compaction (no consistency problem, store lock * will help the store file accounting). * They can run almost concurrently at the region level. - * + * * The only remaining race condition is between the region close and compaction. * So we will evaluate, below, how region close intervenes with compaction if compaction does * not acquire region read lock. - * + * * Here are the steps for compaction: * 1. obtain list of StoreFile's * 2. create StoreFileScanner's based on list from #1 * 3. perform compaction and save resulting files under tmp dir * 4. swap in compacted files - * + * * #1 is guarded by store lock. This patch does not change this --> no worse or better * For #2, we obtain smallest read point (for region) across all the Scanners (for both default * compactor and stripe compactor). @@ -1851,7 +1836,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * This will not conflict with compaction. * For #3, it can be performed in parallel to other operations. * For #4 bulk load and compaction don't conflict with each other on the region level - * (for multi-family atomicy). + * (for multi-family atomicy). * Region close and compaction are guarded pretty well by the 'writestate'. * In HRegion#doClose(), we have : * synchronized (writestate) { @@ -2575,6 +2560,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getScanner(scan, true); } + @Override + public RegionScanner getScanner(Scan scan, List additionalScanners) + throws IOException { + return getScanner(scan, additionalScanners, true); + } + public RegionScanner getScanner(Scan scan, boolean copyCellsFromSharedMem) throws IOException { RegionScanner scanner = getScanner(scan, null, copyCellsFromSharedMem); return scanner; @@ -3668,27 +3659,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); - List newTags = new ArrayList(); - Iterator tagIterator = CellUtil.tagsIterator(cell); - - // Carry forward existing tags - - while (tagIterator.hasNext()) { - - // Add any filters or tag specific rewrites here - - newTags.add(tagIterator.next()); - } - - // Cell TTL handling - - // Check again if we need to add a cell TTL because early out logic - // above may change when there are more tag based features in core. - if (m.getTTL() != Long.MAX_VALUE) { - // Add a cell TTL tag - newTags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); - } - + List newTags = TagUtil.carryForwardTags(null, cell); + newTags = carryForwardTTLTag(newTags, m); // Rewrite the cell with the updated set of tags cells.set(i, new TagRewriteCell(cell, TagUtil.fromList(newTags))); } @@ -7073,7 +7045,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private static List carryForwardTags(final Cell cell, final List tags) { if (cell.getTagsLength() <= 0) return tags; - List newTags = tags == null? new ArrayList(): /*Append Tags*/tags; + List newTags = tags == null? new ArrayList(): /*Append Tags*/tags; Iterator i = CellUtil.tagsIterator(cell); while (i.hasNext()) newTags.add(i.next()); return newTags; @@ -7109,16 +7081,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // transactions, so all stores only go through one code path for puts. @Override - public Result append(Append mutate, long nonceGroup, long nonce) throws IOException { + public Result append(Append append, long nonceGroup, long nonce) throws IOException { Operation op = Operation.APPEND; - byte[] row = mutate.getRow(); + byte[] row = append.getRow(); checkRow(row, op.toString()); - checkFamilies(mutate.getFamilyCellMap().keySet()); + checkFamilies(append.getFamilyCellMap().keySet()); boolean flush = false; - Durability durability = getEffectiveDurability(mutate.getDurability()); + Durability durability = getEffectiveDurability(append.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; - List allKVs = new ArrayList(mutate.size()); + List allKVs = new ArrayList(append.size()); Map> tempMemstore = new HashMap>(); long size = 0; long txid = 0; @@ -7141,14 +7113,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // (so that we are guaranteed to see the latest state when we do our Get) mvcc.await(); if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preAppendAfterRowLock(mutate); + Result r = this.coprocessorHost.preAppendAfterRowLock(append); if (r!= null) { return r; } } long now = EnvironmentEdgeManager.currentTime(); // Process each family - for (Map.Entry> family : mutate.getFamilyCellMap().entrySet()) { + for (Map.Entry> family : append.getFamilyCellMap().entrySet()) { Store store = stores.get(family.getKey()); List kvs = new ArrayList(family.getValue().size()); @@ -7170,12 +7142,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long ts = Math.max(now, oldCell.getTimestamp()); // Process cell tags - // Make a union of the set of tags in the old and new KVs - List newTags = carryForwardTags(oldCell, new ArrayList()); - newTags = carryForwardTags(cell, newTags); - - // Cell TTL handling - + List tags = Tag.carryForwardTags(null, oldCell); + tags = Tag.carryForwardTags(tags, cell); + FIX + tags = carryForwardTTLTag(tags, append); if (mutate.getTTL() != Long.MAX_VALUE) { // Add the new TTL tag newTags.add( @@ -7183,13 +7153,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Rebuild tags - byte[] tagBytes = TagUtil.fromList(newTags); + byte[] tagBytes = Tag.fromList(tags); // allocate an empty cell once newCell = new KeyValue(row.length, cell.getFamilyLength(), cell.getQualifierLength(), ts, KeyValue.Type.Put, oldCell.getValueLength() + cell.getValueLength(), - tagBytes.length); + tagBytes == null? 0: tagBytes.length); // copy in row, family, and qualifier System.arraycopy(cell.getRowArray(), cell.getRowOffset(), newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); @@ -7206,8 +7176,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi newCell.getValueOffset() + oldCell.getValueLength(), cell.getValueLength()); // Copy in tag data - System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), - tagBytes.length); + if (tagBytes != null) { + System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), + tagBytes.length); + } idx++; } else { // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP @@ -7215,7 +7187,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Cell TTL handling - if (mutate.getTTL() != Long.MAX_VALUE) { + if (append.getTTL() != Long.MAX_VALUE) { + // TODO + FIX + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL()))); List newTags = new ArrayList(1); newTags.add( new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); @@ -7229,7 +7204,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND, - mutate, oldCell, newCell); + append, oldCell, newCell); } kvs.add(newCell); @@ -7263,7 +7238,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true); } else { - recordMutationWithoutWal(mutate.getFamilyCellMap()); + recordMutationWithoutWal(append.getFamilyCellMap()); } } if (walKey == null) { @@ -7336,11 +7311,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi requestFlush(); } - return mutate.isReturnResults() ? Result.create(allKVs) : null; - } - - public Result increment(Increment increment) throws IOException { - return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); + return append.isReturnResults() ? Result.create(allKVs) : null; } // TODO: There's a lot of boiler plate code identical to append. @@ -7349,11 +7320,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // They are subtley different in quiet a few ways. This came out only // after study. I am not sure that many of the differences are intentional. - // TODO: St.Ack 20150907 + // TODO: St.Ack 20150907 - @Override - public Result increment(Increment mutation, long nonceGroup, long nonce) + public Result increment(Increment increment) throws IOException { + return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + + public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException { +<<<<<<< HEAD Operation op = Operation.INCREMENT; byte [] row = mutation.getRow(); checkRow(row, op.toString()); @@ -7363,205 +7338,266 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; List allKVs = new ArrayList(mutation.size()); - + Map> tempMemstore = new HashMap>(); long size = 0; long txid = 0; +======= +>>>>>>> forward port checkReadOnly(); checkResources(); - // Lock row - startRegionOperation(op); + checkRow(increment.getRow(), "increment"); this.writeRequestsCount.increment(); - RowLock rowLock = null; WALKey walKey = null; - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - boolean doRollBackMemstore = false; - TimeRange tr = mutation.getTimeRange(); + startRegionOperation(Operation.INCREMENT); + RowLock rowLock = getRowLock(increment.getRow()); + // Accumulate in this List all we are to return to the Client. This Collection can be larger + // than what we apply to WAL and MemStore in case where an Increment comes in with an amount + // of zero. In this case, the Client just wants to know current state of Increment. + List results = new ArrayList(increment.size()); try { - rowLock = getRowLock(row); - assert rowLock != null; + lock(this.updatesLock.readLock()); try { - lock(this.updatesLock.readLock()); - try { - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.await(); - if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation); - if (r != null) { - return r; - } - } - long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry> family: mutation.getFamilyCellMap().entrySet()) { - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - List results = doGet(store, row, family, tr); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - - // Avoid as much copying as possible. We may need to rewrite and - // consolidate tags. Bytes are only copied once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - // HERE WE DIVERGE FROM APPEND - List edits = family.getValue(); - for (int i = 0; i < edits.size(); i++) { - Cell cell = edits.get(i); - long amount = Bytes.toLong(CellUtil.cloneValue(cell)); - boolean noWriteBack = (amount == 0); - - List newTags = carryForwardTags(cell, new ArrayList()); - - Cell c = null; - long ts = now; - if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { - c = results.get(idx); - ts = Math.max(now, c.getTimestamp()); - if(c.getValueLength() == Bytes.SIZEOF_LONG) { - amount += CellUtil.getValueAsLong(c); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new org.apache.hadoop.hbase.DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - // Carry tags forward from previous version - newTags = carryForwardTags(c, newTags); - if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { - idx++; - } - } - - // Append new incremented KeyValue to list - byte[] q = CellUtil.cloneQualifier(cell); - byte[] val = Bytes.toBytes(amount); - - // Add the TTL tag if the mutation carried one - if (mutation.getTTL() != Long.MAX_VALUE) { - newTags.add( - new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL()))); - } - - Cell newKV = new KeyValue(row, 0, row.length, - family.getKey(), 0, family.getKey().length, - q, 0, q.length, - ts, - KeyValue.Type.Put, - val, 0, val.length, - newTags); - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newKV = coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, mutation, c, newKV); - } - allKVs.add(newKV); - - if (!noWriteBack) { - kvs.add(newKV); - - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } - } - } - - //store the kvs to the temporary memstore before writing WAL - if (!kvs.isEmpty()) { - tempMemstore.put(store, kvs); - } - } - - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), - WALKey.NO_SEQUENCE_ID, - nonceGroup, - nonce, - mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdits, true); - } else { - recordMutationWithoutWal(mutation.getFamilyCellMap()); - } - } - if (walKey == null) { - // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendEmptyEdit(this.wal); - } - - // now start my own transaction - writeEntry = walKey.getWriteEntry(); - - // Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - // Is this right? It immediately becomes visible? St.Ack 20150907 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { - CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); - size += store.add(cell); - doRollBackMemstore = true; - } - } - } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) return r; + } + Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); + // Complicated. This method does a few things. It returns three Collections of + // Increments; 1. what to apply to WAL (if any), 2. what to apply to MemStore, and 3. + // what to return to the Client. These Collections do not always agree dependent on whether + // we are writing the WAL and if amount to Increment is zero or not. + Map> forMemStoreByColumnFamily = + new HashMap>(increment.getFamilyCellMap().size()); + WALEdit walEdit = + getIncrementsToApply(increment, effectiveDurability, forMemStoreByColumnFamily, results); + + long txid = 0; + // Actually write to WAL now if a walEdit to apply. + if (walEdit != null && !walEdit.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, mvcc); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdit, true); + } else { + // If walEdits is empty, it means we skipped the WAL; update counters. + recordMutationWithoutWal(increment.getFamilyCellMap()); + } + if (walKey == null) { + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendEmptyEdit(this.wal); + } + // Call sync on our edit. + if (txid != 0) syncOrDefer(txid, effectiveDurability); + + // Now write to MemStore; do it a column family at a time.. + for (Map.Entry> entry: forMemStoreByColumnFamily.entrySet()) { + Store store = entry.getKey(); + List cells = entry.getValue(); + if (store.getFamily().getMaxVersions() == 1) { + // Upsert if VERSIONS for this CF == 1. Use write sequence id rather than read point + // when doing fast increment. + accumulatedResultSize += store.upsert(cells, walKey.getSequenceId()); + } else { + // Otherwise keep older versions around + for (Cell cell: cells) { + accumulatedResultSize += store.add(cell); } - } finally { - this.updatesLock.readLock().unlock(); } + // Tell mvcc this write is complete (The mvcc was begun down inside the FSHLog on append). + this.mvcc.complete(walKey.getWriteEntry()); + // Clear the write entry to signify success (else it gets completed again in finally below) + walKey.setWriteEntry(null); } finally { - rowLock.release(); - rowLock = null; - } - // sync the transaction log outside the rowlock - if(txid != 0){ - syncOrDefer(txid, durability); + this.updatesLock.readLock().unlock(); } - doRollBackMemstore = false; + return Result.create(results); } finally { - if (rowLock != null) { - rowLock.release(); - } - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - for(List cells: tempMemstore.values()) { - rollbackMemstore(cells); + rowLock.release(); + if (walKey != null && walKey.getWriteEntry() != null) { + // Complete this even if in error so we keep the mvcc rolling forward. + this.mvcc.complete(walKey.getWriteEntry()); } - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } + // Request a cache flush if over the limit. Do it outside update lock. + if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); closeRegionOperation(Operation.INCREMENT); - if (this.metricsRegion != null) { - this.metricsRegion.updateIncrement(); + if (this.metricsRegion != null) this.metricsRegion.updateIncrement(); + } + } + + /** + * @return Sorted list of cells using comparator + */ + private static List sort(List cells, final Comparator comparator) { + Collections.sort(cells, comparator); + return cells; + } + + /** + * Calculate the increments to apply. + * @return A WALEdit to apply to WAL or null if we are to skip the WAL. Also fills out the + * passed in results to return to client and increments to + * add to forMemStore. These may not agree dependent on whether to write WAL or if + * the amount to increment is zero (in this case we write back nothing, just return latest value + * to the client). + * @throws IOException + */ + private WALEdit getIncrementsToApply(final Increment increment, + final Durability effectiveDurability, final Map> forMemStore, + final List results) + throws IOException { + long now = EnvironmentEdgeManager.currentTime(); + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + // Process increments a Store/family at a time. + WALEdit walEdits = null; + for (Map.Entry> entry: increment.getFamilyCellMap().entrySet()) { + final byte [] columnFamilyName = entry.getKey(); + List increments = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Do increment for this Store; be sure to 'sort' the increments first so increments + // match order in which we get back current Cells when we Get. + // TODO: We are doing a Get for each ColumnFamily/Store; why not Get them all in one go? + List toApply = getIncrementsToApplyToColumnFamily(increment, columnFamilyName, + sort(increments, store.getComparator()), now, results, IsolationLevel.READ_UNCOMMITTED); + if (!toApply.isEmpty()) { + forMemStore.put(store, toApply); + if (writeToWAL) { + if (walEdits == null) walEdits = new WALEdit(); + walEdits.getCells().addAll(toApply); + } } } + return walEdits; + } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); + /** + * Calculate the increments to apply on a column family. Does Get of current increment values + * and then adds passed in amounts returning result. If amount is zero, Client just wants to + * read current setting. In this case don't write to WAL or MemStore. Add current value to + * passed in results only. Otherwise, return new total for caller to apply. + * @param sortedIncrements The passed in increments to apply MUST be sorted so that they match + * the order that they appear in the Get results (get results will be sorted on return). + * Otherwise, we won't be able to find existing values if the cells are not specified in + * order by the client since cells are in an array list. + * @param isolation Isolation level to use when running the 'get'. Pass null for default. + * @param results In here we accumulate all the KVs we are to return to the client; this Set can + * be larger than what we return in case where amount to Increment is zero; i.e. don't write + * out new values, just return current state of increments. + * @return Resulting increments after sortedIncrements have been applied to current + * values. Side effect is our filling out of the results List. + * @throws IOException + */ + private List getIncrementsToApplyToColumnFamily(Increment increment, + byte[] columnFamilyName, List sortedIncrements, long now, + final List results, final IsolationLevel isolation) + throws IOException { + List toApply = new ArrayList(sortedIncrements.size()); + byte [] row = increment.getRow(); + // Get previous values for all columns in this family + List currentValues = + getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation); + // Iterate the input columns and update existing values if they were found, otherwise + // add new column initialized to the increment amount + int idx = 0; + for (int i = 0; i < sortedIncrements.size(); i++) { + Cell inc = sortedIncrements.get(i); + long incrementAmount = getLongValue(inc); + // If increment amount == 0, then don't write this Increment to the WAL. + boolean writeBack = (incrementAmount != 0); + // Carry forward any tags that might have been added by a coprocessor. + List tags = Tag.carryForwardTags(inc); + + Cell currentValue = null; + long ts = now; + if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) { + currentValue = currentValues.get(idx); + ts = Math.max(now, currentValue.getTimestamp()); + incrementAmount += getLongValue(currentValue); + // Carry forward all tags + tags = Tag.carryForwardTags(tags, currentValue); + if (i < (sortedIncrements.size() - 1) && + !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) { + idx++; + } + } + + // Append new incremented KeyValue to list + byte [] qualifier = CellUtil.cloneQualifier(inc); + byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount); + tags = carryForwardTTLTag(tags, increment); + + Cell newValue = new KeyValue(row, 0, row.length, + columnFamilyName, 0, columnFamilyName.length, + qualifier, 0, qualifier.length, + ts, KeyValue.Type.Put, + incrementAmountInBytes, 0, incrementAmountInBytes.length, + tags); + + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newValue = coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue); + } + // If writeBack is true, we need to update memstore/WAL with new value. Otherwise, we are + // just to return the current value; add it to allKVs. + if (writeBack) toApply.add(newValue); + results.add(newValue); } - return mutation.isReturnResults() ? Result.create(allKVs) : null; + return toApply; + } + + /** + * @return Get the long out of the passed in Cell + * @throws DoNotRetryIOException + */ + private static long getLongValue(final Cell cell) throws DoNotRetryIOException { + int len = cell.getValueLength(); + if (len != Bytes.SIZEOF_LONG) { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide"); + } + return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len); + } + + /** + * Do a specific Get on passed columnFamily and column qualifiers + * from incrementCoordinates only. + * @param increment + * @param columnFamily + * @param incrementCoordinates + * @return Return the Cells to Increment + * @throws IOException + */ + private List getIncrementCurrentValue(final Increment increment, byte [] columnFamily, + final List increments, final IsolationLevel isolation) + throws IOException { + Get get = new Get(increment.getRow()); + if (isolation != null) get.setIsolationLevel(isolation); + for (Cell cell: increments) { + get.addColumn(columnFamily, CellUtil.cloneQualifier(cell)); + } + TimeRange tr = increment.getTimeRange(); + get.setTimeRange(tr.getMin(), tr.getMax()); + return get(get, false); + } + + /** + * @return Carry forward the TTL tag if the increment is carrying one + */ + private static List carryForwardTTLTag(final List tagsOrNull, + final Mutation mutation) { + long ttl = mutation.getTTL(); + if (ttl == Long.MAX_VALUE) return tagsOrNull; + List tags = tagsOrNull; + // If we are making the array in here, given we are the last thing checked, we'll be only thing + // in the array so set its size to '1' (I saw this being done in earlier version of + // tag-handling). + if (tags == null) tags = new ArrayList(1); + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + return tags; } // @@ -8166,7 +8202,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); - + // Call append but with an empty WALEdit. The returned sequence id will not be associated // with any edit and we can be sure it went in after all outstanding appends. try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 00046ba..b2cc78a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -134,6 +134,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; @@ -484,6 +485,8 @@ public class HRegionServer extends HasThread implements RegionServerServices, La */ protected final ConfigurationManager configurationManager; + private CompactedHFilesDischarger compactedFileDischarger; + /** * Starts a HRegionServer at the default location. * @param conf @@ -615,6 +618,16 @@ public class HRegionServer extends HasThread implements RegionServerServices, La } }); } + // Create the CompactedFileDischarger chore service. This chore helps to + // remove the compacted files + // that will no longer be used in reads. + // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to + // 2 mins so that compacted files can be archived before the TTLCleaner runs + int cleanerInterval = + conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); + this.compactedFileDischarger = + new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this); + choreService.scheduleChore(compactedFileDischarger); } protected TableDescriptors getFsTableDescriptors() throws IOException { @@ -1716,7 +1729,9 @@ public class HRegionServer extends HasThread implements RegionServerServices, La } this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); - + // Start the threads for compacted files discharger + this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, + conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, conf.getInt("hbase.regionserver.region.replica.flusher.threads", @@ -2725,6 +2740,15 @@ public class HRegionServer extends HasThread implements RegionServerServices, La return tableRegions; } + @Override + public List getOnlineRegions() { + List allRegions = new ArrayList(); + synchronized (this.onlineRegions) { + // Return a clone copy of the onlineRegions + allRegions.addAll(onlineRegions.values()); + } + return allRegions; + } /** * Gets the online tables in this RS. * This method looks at the in-memory onlineRegions. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 8d66696..9ebdaee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -37,9 +37,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -76,7 +74,6 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -91,7 +88,6 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -139,8 +135,6 @@ public class HStore implements Store { static int closeCheckInterval = 0; private volatile long storeSize = 0L; private volatile long totalUncompressedBytes = 0L; - private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null; - private CompletionService completionService = null; /** * RWLock for store operations. @@ -274,10 +268,6 @@ public class HStore implements Store { "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); } - compactionCleanerthreadPoolExecutor = getThreadPoolExecutor( - conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10)); - completionService = - new ExecutorCompletionService(compactionCleanerthreadPoolExecutor); cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); } @@ -802,7 +792,9 @@ public class HStore implements Store { Collection compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); // clear the compacted files - removeCompactedFiles(compactedfiles); + if (compactedfiles != null && !compactedfiles.isEmpty()) { + removeCompactedfiles(compactedfiles); + } if (!result.isEmpty()) { // initialize the thread pool for closing store files in parallel. ThreadPoolExecutor storeFileCloserThreadPool = this.region @@ -844,9 +836,6 @@ public class HStore implements Store { } if (ioe != null) throw ioe; } - if (compactionCleanerthreadPoolExecutor != null) { - compactionCleanerthreadPoolExecutor.shutdownNow(); - } LOG.info("Closed " + this); return result; } finally { @@ -2174,7 +2163,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2311,92 +2300,72 @@ public class HStore implements Store { } finally { lock.readLock().unlock(); } - removeCompactedFiles(copyCompactedfiles); - } - - private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) { - return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS, - new ThreadFactory() { - private int count = 1; - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "CompactedfilesArchiver-" + count++); - } - }); + if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) { + removeCompactedfiles(copyCompactedfiles); + } } - private void removeCompactedFiles(Collection compactedfiles) throws IOException { - if (compactedfiles != null && !compactedfiles.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Removing the compacted store files " + compactedfiles); - } - for (final StoreFile file : compactedfiles) { - completionService.submit(new Callable() { - @Override - public StoreFile call() throws IOException { - synchronized (file) { - try { - StoreFile.Reader r = file.getReader(); - if (r == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("The file " + file + " was closed but still not archived."); - } - return file; - } - if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) { - // Even if deleting fails we need not bother as any new scanners won't be - // able to use the compacted file as the status is already compactedAway - if (LOG.isTraceEnabled()) { - LOG.trace("Closing and archiving the file " + file.getPath()); - } - r.close(true); - // Just close and return - return file; - } - } catch (Exception e) { - LOG.error("Exception while trying to close the compacted store file " - + file.getPath().getName()); - } + /** + * Archives and removes the compacted files + * @param compactedfiles The compacted files in this store that are not active in reads + * @throws IOException + */ + private void removeCompactedfiles(Collection compactedfiles) + throws IOException { + final List filesToRemove = new ArrayList(compactedfiles.size()); + for (final StoreFile file : compactedfiles) { + synchronized (file) { + try { + StoreFile.Reader r = file.getReader(); + if (r == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("The file " + file + " was closed but still not archived."); } - return null; + filesToRemove.add(file); } - }); - } - final List filesToRemove = new ArrayList(compactedfiles.size()); - try { - for (final StoreFile file : compactedfiles) { - Future future = completionService.take(); - StoreFile closedFile = future.get(); - if (closedFile != null) { - filesToRemove.add(closedFile); + if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) { + // Even if deleting fails we need not bother as any new scanners won't be + // able to use the compacted file as the status is already compactedAway + if (LOG.isTraceEnabled()) { + LOG.trace("Closing and archiving the file " + file.getPath()); + } + r.close(true); + // Just close and return + filesToRemove.add(file); } + } catch (Exception e) { + LOG.error( + "Exception while trying to close the compacted store file " + file.getPath().getName()); } - } catch (InterruptedException ie) { - LOG.error("Interrupted exception while closing the compacted files", ie); - } catch (Exception e) { - LOG.error("Exception occured while closing the compacted files", e); } - if (isPrimaryReplicaStore()) { - archiveAndRemoveCompactedFiles(filesToRemove); + } + if (this.isPrimaryReplicaStore()) { + // Only the primary region is allowed to move the file to archive. + // The secondary region does not move the files to archive. Any active reads from + // the secondary region will still work because the file as such has active readers on it. + if (!filesToRemove.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Moving the files " + filesToRemove + " to archive"); + } + // Only if this is successful it has to be removed + this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove); } - + } + if (!filesToRemove.isEmpty()) { + // Clear the compactedfiles from the store file manager + clearCompactedfiles(filesToRemove); } } - private void archiveAndRemoveCompactedFiles(List filesToArchive) throws IOException { - if (!filesToArchive.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Moving the files " + filesToArchive + " to archive"); - } - // Only if this is successful it has to be removed - this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive); - try { - lock.writeLock().lock(); - this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive); - } finally { - lock.writeLock().unlock(); - } + private void clearCompactedfiles(final List filesToRemove) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Clearing the compacted file " + filesToRemove + " from this store"); + } + try { + lock.writeLock().lock(); + this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove); + } finally { + lock.writeLock().unlock(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java index 60fc9fb..310108c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java @@ -67,4 +67,10 @@ public interface OnlineRegions extends Server { * @throws java.io.IOException */ List getOnlineRegions(TableName tableName) throws IOException; + + /** + * Get all online regions in this RS. + * @return List of online Region + */ + List getOnlineRegions(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 6d87057..5da8bcb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -392,6 +392,21 @@ public interface Region extends ConfigurationObserver { */ RegionScanner getScanner(Scan scan) throws IOException; + /** + * Return an iterator that scans over the HRegion, returning the indicated columns and rows + * specified by the {@link Scan}. The scanner will also include the additional scanners passed + * along with the scanners for the specified Scan instance. Should be careful with the usage to + * pass additional scanners only within this Region + *

+ * This Iterator must be closed by the caller. + * + * @param scan configured {@link Scan} + * @param additionalScanners Any additional scanners to be used + * @return RegionScanner + * @throws IOException read exceptions + */ + RegionScanner getScanner(Scan scan, List additionalScanners) throws IOException; + /** The comparator to be used with the region */ CellComparator getCellCompartor(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java deleted file mode 100644 index 4cf120d..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.compactions; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Store; - -/** - * A chore service that periodically cleans up the compacted files when there are no active readers - * using those compacted files and also helps in clearing the block cache with these compacted - * file entries - */ -@InterfaceAudience.Private -public class CompactedHFilesDischarger extends ScheduledChore { - private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class); - private Region region; - - /** - * @param period the period of time to sleep between each run - * @param stopper the stopper - * @param region the store to identify the family name - */ - public CompactedHFilesDischarger(final int period, final Stoppable stopper, final Region region) { - // Need to add the config classes - super("CompactedHFilesCleaner", stopper, period); - this.region = region; - } - - @Override - public void chore() { - if (LOG.isTraceEnabled()) { - LOG.trace( - "Started the compacted hfiles cleaner for the region " + this.region.getRegionInfo()); - } - for (Store store : region.getStores()) { - try { - store.closeAndArchiveCompactedFiles(); - if (LOG.isTraceEnabled()) { - LOG.trace("Completed archiving the compacted files for the region " - + this.region.getRegionInfo() + " under the store " + store.getColumnFamilyName()); - } - } catch (Exception e) { - LOG.error( - "Exception while trying to close and archive the comapcted store files of the store " - + store.getColumnFamilyName() + " in the region " + this.region.getRegionInfo(), - e); - } - } - if (LOG.isTraceEnabled()) { - LOG.trace( - "Completed the compacted hfiles cleaner for the region " + this.region.getRegionInfo()); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 62e7c7c..633477e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -64,6 +64,9 @@ public class CompactionConfiguration { public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = "hbase.hstore.min.locality.to.skip.major.compact"; + public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT = + "hbase.hfile.compaction.discharger.thread.count"; + Configuration conf; StoreConfigInformation storeConfigInfo; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index a7fc75b..0986ad7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -110,6 +110,11 @@ public class MockRegionServerServices implements RegionServerServices { } @Override + public List getOnlineRegions() { + return null; + } + + @Override public void addToOnlineRegions(Region r) { this.regions.put(r.getRegionInfo().getEncodedName(), r); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 55e43de..64139ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.backup.example; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -42,10 +44,11 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; +import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; @@ -79,6 +82,7 @@ public class TestZooKeeperTableArchiveClient { private static ZKTableArchiveClient archivingClient; private final List toCleanup = new ArrayList(); private static ClusterConnection CONNECTION; + private static RegionServerServices rss; /** * Setup the config for the cluster @@ -93,6 +97,7 @@ public class TestZooKeeperTableArchiveClient { ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher(); String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); ZKUtil.createWithParents(watcher, archivingZNode); + rss = mock(RegionServerServices.class); } private static void setupConf(Configuration conf) { @@ -173,8 +178,11 @@ public class TestZooKeeperTableArchiveClient { // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); + List regions = new ArrayList(); + regions.add(region); + when(rss.getOnlineRegions()).thenReturn(regions); final CompactedHFilesDischarger compactionCleaner = - new CompactedHFilesDischarger(100, stop, region); + new CompactedHFilesDischarger(100, stop, rss, false); loadFlushAndCompact(region, TEST_FAM); compactionCleaner.chore(); // get the current hfiles in the archive directory @@ -223,15 +231,21 @@ public class TestZooKeeperTableArchiveClient { // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); + List regions = new ArrayList(); + regions.add(region); + when(rss.getOnlineRegions()).thenReturn(regions); final CompactedHFilesDischarger compactionCleaner = - new CompactedHFilesDischarger(100, stop, region); + new CompactedHFilesDischarger(100, stop, rss, false); loadFlushAndCompact(region, TEST_FAM); compactionCleaner.chore(); // create the another table that we don't archive hcd = new HColumnDescriptor(TEST_FAM); HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); - final CompactedHFilesDischarger compactionCleaner1 = - new CompactedHFilesDischarger(100, stop, otherRegion); + regions = new ArrayList(); + regions.add(otherRegion); + when(rss.getOnlineRegions()).thenReturn(regions); + final CompactedHFilesDischarger compactionCleaner1 = new CompactedHFilesDischarger(100, stop, + rss, false); loadFlushAndCompact(otherRegion, TEST_FAM); compactionCleaner1.chore(); // get the current hfiles in the archive directory diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 8734aea..63d9cd0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -48,7 +48,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -119,6 +118,7 @@ import org.junit.experimental.categories.Category; @Category({LargeTests.class, ClientTests.class}) @SuppressWarnings ("deprecation") public class TestFromClientSide { + // NOTE: Increment tests were moved to their own class, TestIncrementsFromClientSide. private static final Log LOG = LogFactory.getLog(TestFromClientSide.class); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static byte [] ROW = Bytes.toBytes("testRow"); @@ -3046,7 +3046,7 @@ public class TestFromClientSide { equals(value, CellUtil.cloneValue(key))); } - private void assertIncrementKey(Cell key, byte [] row, byte [] family, + static void assertIncrementKey(Cell key, byte [] row, byte [] family, byte [] qualifier, long value) throws Exception { assertTrue("Expected row [" + Bytes.toString(row) + "] " + @@ -3270,7 +3270,7 @@ public class TestFromClientSide { return stamps; } - private boolean equals(byte [] left, byte [] right) { + static boolean equals(byte [] left, byte [] right) { if (left == null && right == null) return true; if (left == null && right.length == 0) return true; if (right == null && left.length == 0) return true; @@ -4399,264 +4399,6 @@ public class TestFromClientSide { } @Test - public void testIncrementWithDeletes() throws Exception { - LOG.info("Starting testIncrementWithDeletes"); - final TableName TABLENAME = - TableName.valueOf("testIncrementWithDeletes"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - TEST_UTIL.flush(TABLENAME); - - Delete del = new Delete(ROW); - ht.delete(del); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - - Get get = new Get(ROW); - Result r = ht.get(get); - assertEquals(1, r.size()); - assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); - } - - @Test - public void testIncrementingInvalidValue() throws Exception { - LOG.info("Starting testIncrementingInvalidValue"); - final TableName TABLENAME = TableName.valueOf("testIncrementingInvalidValue"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - Put p = new Put(ROW); - // write an integer here (not a Long) - p.addColumn(FAMILY, COLUMN, Bytes.toBytes(5)); - ht.put(p); - try { - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, COLUMN, 5); - try { - ht.increment(inc); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - } - - @Test - public void testIncrementInvalidArguments() throws Exception { - LOG.info("Starting testIncrementInvalidArguments"); - final TableName TABLENAME = TableName.valueOf("testIncrementInvalidArguments"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - try { - // try null row - ht.incrementColumnValue(null, FAMILY, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null family - ht.incrementColumnValue(ROW, null, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null qualifier - ht.incrementColumnValue(ROW, FAMILY, null, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - // try null row - try { - Increment incNoRow = new Increment((byte [])null); - incNoRow.addColumn(FAMILY, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } catch (NullPointerException npe) { - // success - } - // try null family - try { - Increment incNoFamily = new Increment(ROW); - incNoFamily.addColumn(null, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - // try null qualifier - try { - Increment incNoQualifier = new Increment(ROW); - incNoQualifier.addColumn(FAMILY, null, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - } - - @Test - public void testIncrementOutOfOrder() throws Exception { - LOG.info("Starting testIncrementOutOfOrder"); - final TableName TABLENAME = TableName.valueOf("testIncrementOutOfOrder"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte [][] QUALIFIERS = new byte [][] { - Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C") - }; - - Increment inc = new Increment(ROW); - for (int i=0; i gets = new ArrayList(); for (byte[] k : KEYS) { Get get = new Get(k); get.addColumn(BYTES_FAMILY, QUALIFIER); - Result r = table.get(get); - Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); - Assert.assertEquals(0, Bytes.compareTo(VALUE, r - .getValue(BYTES_FAMILY, QUALIFIER))); + gets.add(get); + } + int retryNum = 10; + Result[] results = null; + do { + results = table.get(gets); + boolean finished = true; + for (Result result : results) { + if (result.isEmpty()) { + finished = false; + break; + } + } + if (finished) { + break; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + retryNum--; + } while (retryNum > 0); + + if (retryNum == 0) { + fail("Timeout for validate data"); + } else { + if (results != null) { + for (Result r : results) { + Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); + Assert.assertEquals(0, Bytes.compareTo(VALUE, r + .getValue(BYTES_FAMILY, QUALIFIER))); + } + LOG.info("Validating data on " + table + " successfully!"); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 234ad20..32f644b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -462,6 +462,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override + public List getOnlineRegions() { + return null; + } + + @Override public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) throws ServiceException { // TODO Auto-generated method stub diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java index 2df4ac9..02f3721 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java @@ -50,6 +50,8 @@ public class TestMasterMetricsWrapper { public void testInfo() { HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); MetricsMasterWrapperImpl info = new MetricsMasterWrapperImpl(master); + assertEquals(master.getSplitPlanCount(), info.getSplitPlanCount(), 0); + assertEquals(master.getMergePlanCount(), info.getMergePlanCount(), 0); assertEquals(master.getAverageLoad(), info.getAverageLoad(), 0); assertEquals(master.getClusterId(), info.getClusterId()); assertEquals(master.getMasterActiveTime(), info.getActiveTime()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java index 60c5473..a6b6e4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java @@ -46,11 +46,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnaps import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; @@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -324,10 +324,17 @@ public class TestSnapshotFromMaster { region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it. region.compactStores(); // min is 2 so will compact and archive } - for (HRegion region : regions) { - CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region); - cleaner.chore(); + List regionServerThreads = UTIL.getMiniHBaseCluster() + .getRegionServerThreads(); + HRegionServer hrs = null; + for (RegionServerThread rs : regionServerThreads) { + if (!rs.getRegionServer().getOnlineRegions(TABLE_NAME).isEmpty()) { + hrs = rs.getRegionServer(); + break; + } } + CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false); + cleaner.chore(); LOG.info("After compaction File-System state"); FSUtils.logFileSystemState(fs, rootDir, LOG); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java index 970af43..1f66044 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -53,6 +55,18 @@ public class TestSimpleRegionNormalizer { private static final Log LOG = LogFactory.getLog(TestSimpleRegionNormalizer.class); private static RegionNormalizer normalizer; + private static List bothTypes; + static { + bothTypes = new ArrayList<>(); + bothTypes.add(PlanType.SPLIT); + bothTypes.add(PlanType.MERGE); + } + + private static List splitType; + static { + splitType = new ArrayList<>(); + splitType.add(PlanType.SPLIT); + } // mocks private static MasterServices masterServices; @@ -69,8 +83,8 @@ public class TestSimpleRegionNormalizer { Map regionSizes = new HashMap<>(); setupMocksForNormalizer(regionSizes, hris); - NormalizationPlan plan = normalizer.computePlanForTable(testTable); - assertTrue(plan instanceof EmptyNormalizationPlan); + List plans = normalizer.computePlanForTable(testTable, bothTypes); + assertTrue(plans == null); } @Test @@ -88,8 +102,8 @@ public class TestSimpleRegionNormalizer { regionSizes.put(hri2.getRegionName(), 15); setupMocksForNormalizer(regionSizes, hris); - NormalizationPlan plan = normalizer.computePlanForTable(testTable); - assertTrue((plan instanceof EmptyNormalizationPlan)); + List plans = normalizer.computePlanForTable(testTable, bothTypes); + assertTrue(plans == null); } @Test @@ -114,14 +128,18 @@ public class TestSimpleRegionNormalizer { hris.add(hri4); regionSizes.put(hri4.getRegionName(), 10); - setupMocksForNormalizer(regionSizes, hris); - NormalizationPlan plan = normalizer.computePlanForTable(testTable); - assertTrue(plan instanceof EmptyNormalizationPlan); + List plans = normalizer.computePlanForTable(testTable, bothTypes); + assertTrue(plans == null); } @Test public void testMergeOfSmallRegions() throws HBaseIOException { + testMergeOfSmallRegions(true); + testMergeOfSmallRegions(false); + } + + public void testMergeOfSmallRegions(boolean mergeDesired) throws HBaseIOException { TableName testTable = TableName.valueOf("testMergeOfSmallRegions"); List hris = new ArrayList<>(); Map regionSizes = new HashMap<>(); @@ -147,11 +165,17 @@ public class TestSimpleRegionNormalizer { regionSizes.put(hri5.getRegionName(), 16); setupMocksForNormalizer(regionSizes, hris); - NormalizationPlan plan = normalizer.computePlanForTable(testTable); - - assertTrue(plan instanceof MergeNormalizationPlan); - assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion()); - assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion()); + List plans = normalizer.computePlanForTable(testTable, + mergeDesired ? bothTypes : splitType); + + if (mergeDesired) { + NormalizationPlan plan = plans.get(0); + assertTrue(plan instanceof MergeNormalizationPlan); + assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion()); + assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion()); + } else { + assertTrue(plans == null); + } } // Test for situation illustrated in HBASE-14867 @@ -186,7 +210,8 @@ public class TestSimpleRegionNormalizer { regionSizes.put(hri6.getRegionName(), 2700); setupMocksForNormalizer(regionSizes, hris); - NormalizationPlan plan = normalizer.computePlanForTable(testTable); + List plans = normalizer.computePlanForTable(testTable, bothTypes); + NormalizationPlan plan = plans.get(0); assertTrue(plan instanceof MergeNormalizationPlan); assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion()); @@ -220,9 +245,9 @@ public class TestSimpleRegionNormalizer { regionSizes.put(hri5.getRegionName(), 5); setupMocksForNormalizer(regionSizes, hris); - NormalizationPlan plan = normalizer.computePlanForTable(testTable); + List plans = normalizer.computePlanForTable(testTable, bothTypes); - assertTrue(plan instanceof EmptyNormalizationPlan); + assertTrue(plans == null); } @Test @@ -248,7 +273,8 @@ public class TestSimpleRegionNormalizer { regionSizes.put(hri4.getRegionName(), 30); setupMocksForNormalizer(regionSizes, hris); - NormalizationPlan plan = normalizer.computePlanForTable(testTable); + List plans = normalizer.computePlanForTable(testTable, bothTypes); + NormalizationPlan plan = plans.get(0); assertTrue(plan instanceof SplitNormalizationPlan); assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java index 4fe42ed..801e92d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.TableNamespaceManager; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.namespace.TestNamespaceAuditor; import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -137,7 +137,7 @@ public class TestSimpleRegionNormalizerOnCluster { } HTableDescriptor htd = admin.getTableDescriptor(TABLENAME); - htd.setNormalizationEnabled(true); + htd.setNormalizationMode("MS"); admin.modifyTable(TABLENAME, htd); admin.flush(TABLENAME); @@ -155,11 +155,19 @@ public class TestSimpleRegionNormalizerOnCluster { } while (skippedSplitcnt == 0L); assert(skippedSplitcnt > 0); } else { - while (MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME) < 6) { - LOG.info("Waiting for normalization split to complete"); - Thread.sleep(100); + while (true) { + List regions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME); + int cnt = 0; + for (HRegion region : regions) { + String regionName = region.getRegionInfo().getRegionNameAsString(); + if (regionName.startsWith("testRegionNormalizationSplitOnCluster,zzzzz")) { + cnt++; + } + } + if (cnt >= 2) { + break; + } } - assertEquals(6, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME)); } admin.disableTable(TABLENAME); @@ -207,7 +215,7 @@ public class TestSimpleRegionNormalizerOnCluster { } HTableDescriptor htd = admin.getTableDescriptor(TABLENAME); - htd.setNormalizationEnabled(true); + htd.setNormalizationMode("MS"); admin.modifyTable(TABLENAME, htd); admin.flush(TABLENAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index d15a7f4..fb8ea04 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; @@ -182,13 +182,14 @@ public class TestAtomicOperation { * Test multi-threaded increments. */ @Test - public void testIncrementMultiThreads() throws IOException { + public void testIncrementMultiThreads(final boolean fast) throws IOException { LOG.info("Starting test testIncrementMultiThreads"); // run a with mixed column families (1 and 3 versions) initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); - // create 25 threads, each will increment by its own quantity - int numThreads = 25; + // Create 100 threads, each will increment by its own quantity. All 100 threads update the + // same row over two column families. + int numThreads = 100; int incrementsPerThread = 1000; Incrementer[] all = new Incrementer[numThreads]; int expectedTotal = 0; @@ -211,9 +212,9 @@ public class TestAtomicOperation { LOG.info("Ignored", e); } } - assertICV(row, fam1, qual1, expectedTotal); - assertICV(row, fam1, qual2, expectedTotal*2); - assertICV(row, fam2, qual3, expectedTotal*3); + assertICV(row, fam1, qual1, expectedTotal, fast); + assertICV(row, fam1, qual2, expectedTotal*2, fast); + assertICV(row, fam2, qual3, expectedTotal*3, fast); LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); } @@ -221,9 +222,11 @@ public class TestAtomicOperation { private void assertICV(byte [] row, byte [] familiy, byte[] qualifier, - long amount) throws IOException { + long amount, + boolean fast) throws IOException { // run a get and see? Get get = new Get(row); + if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); get.addColumn(familiy, qualifier); Result result = region.get(get); assertEquals(1, result.size()); @@ -254,7 +257,8 @@ public class TestAtomicOperation { } /** - * A thread that makes a few increment calls + * A thread that makes increment calls always on the same row, this.row against two column + * families on this row. */ public static class Incrementer extends Thread { @@ -263,9 +267,8 @@ public class TestAtomicOperation { private final int amount; - public Incrementer(Region region, - int threadNumber, int amount, int numIncrements) { - super("incrementer." + threadNumber); + public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { + super("Incrementer." + threadNumber); this.region = region; this.numIncrements = numIncrements; this.amount = amount; @@ -281,19 +284,19 @@ public class TestAtomicOperation { inc.addColumn(fam1, qual2, amount*2); inc.addColumn(fam2, qual3, amount*3); inc.setDurability(Durability.ASYNC_WAL); - region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); - - // verify: Make sure we only see completed increments - Get g = new Get(row); - Result result = region.get(g); + Result result = region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); if (result != null) { - assertTrue(result.getValue(fam1, qual1) != null); - assertTrue(result.getValue(fam1, qual2) != null); assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); assertTrue(result.getValue(fam2, qual3) != null); assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3))); + assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, + Bytes.toLong(result.getValue(fam1, qual2))); + long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3; + long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); + assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, + fam1Increment, fam2Increment); } } catch (IOException e) { e.printStackTrace(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index c59d6f7..382193b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -69,7 +70,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; -import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -167,9 +167,17 @@ public class TestHRegionReplayEvents { when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); when(rss.getConfiguration()).thenReturn(CONF); when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting()); - + String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER + .toString(); + ExecutorService es = new ExecutorService(string); + es.startExecutorService( + string+"-"+string, 1); + when(rss.getExecutorService()).thenReturn(es); primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); primaryRegion.close(); + List regions = new ArrayList(); + regions.add(primaryRegion); + when(rss.getOnlineRegions()).thenReturn(regions); primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); @@ -1370,7 +1378,10 @@ public class TestHRegionReplayEvents { // Test case 3: compact primary files primaryRegion.compactStores(); - CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion); + List regions = new ArrayList(); + regions.add(primaryRegion); + when(rss.getOnlineRegions()).thenReturn(regions); + CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); cleaner.chore(); secondaryRegion.refreshStoreFiles(); assertPathListsEqual(primaryRegion.getStoreFileList(families), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java index e0c1453..44b24ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java @@ -62,12 +62,12 @@ import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; -import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.util.StringUtils; @@ -246,25 +246,37 @@ public class TestRegionMergeTransactionOnCluster { count += hrfs.getStoreFiles(colFamily.getName()).size(); } admin.compactRegion(mergedRegionInfo.getRegionName()); - // wait until merged region doesn't have reference file + // clean up the merged region store files + // wait until merged region have reference file long timeout = System.currentTimeMillis() + waitTime; + int newcount = 0; while (System.currentTimeMillis() < timeout) { - if (!hrfs.hasReferences(tableDescriptor)) { + for(HColumnDescriptor colFamily : columnFamilies) { + newcount += hrfs.getStoreFiles(colFamily.getName()).size(); + } + if(newcount > count) { break; } Thread.sleep(50); } - int newcount = 0; - for(HColumnDescriptor colFamily : columnFamilies) { - newcount += hrfs.getStoreFiles(colFamily.getName()).size(); - } assertTrue(newcount > count); - // clean up the merged region store files - List regions = - TEST_UTIL.getHBaseCluster().getRegions(tableDescriptor.getName()); - for (HRegion region : regions) { - CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region); + List regionServerThreads = TEST_UTIL.getHBaseCluster() + .getRegionServerThreads(); + for (RegionServerThread rs : regionServerThreads) { + CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, + rs.getRegionServer(), false); cleaner.chore(); + Thread.sleep(1000); + } + int newcount1 = 0; + while (System.currentTimeMillis() < timeout) { + for(HColumnDescriptor colFamily : columnFamilies) { + newcount1 += hrfs.getStoreFiles(colFamily.getName()).size(); + } + if(newcount1 <= 1) { + break; + } + Thread.sleep(50); } // run CatalogJanitor to clean merge references in hbase:meta and archive the // files of merging regions diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index 67258aa..99f5801 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*; import java.io.IOException; +import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -47,9 +48,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.util.StringUtils; import org.junit.AfterClass; @@ -454,8 +455,18 @@ public class TestRegionReplicas { LOG.info("Force Major compaction on primary region " + hriPrimary); primaryRegion.compact(true); Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); + List regionServerThreads = HTU.getMiniHBaseCluster() + .getRegionServerThreads(); + HRegionServer hrs = null; + for (RegionServerThread rs : regionServerThreads) { + if (rs.getRegionServer() + .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) { + hrs = rs.getRegionServer(); + break; + } + } CompactedHFilesDischarger cleaner = - new CompactedHFilesDischarger(100, null, (HRegion) primaryRegion); + new CompactedHFilesDischarger(100, null, hrs, false); cleaner.chore(); // scan all the hfiles on the secondary. // since there are no read on the secondary when we ask locations to diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index 0f7f23a..d99643d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -551,7 +551,7 @@ public class TestTags { public static class TestCoprocessorForTags extends BaseRegionObserver { - public static boolean checkTagPresence = false; + public static volatile boolean checkTagPresence = false; public static List tags = null; @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java index 40539c4..c23e794 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.compactions; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -38,10 +40,12 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -62,6 +66,7 @@ public class TestCompactedHFilesDischarger { private static CountDownLatch latch = new CountDownLatch(3); private static AtomicInteger counter = new AtomicInteger(0); private static AtomicInteger scanCompletedCounter = new AtomicInteger(0); + private RegionServerServices rss; @Before public void setUp() throws Exception { @@ -71,6 +76,10 @@ public class TestCompactedHFilesDischarger { HRegionInfo info = new HRegionInfo(tableName, null, null, false); Path path = testUtil.getDataTestDir(getClass().getSimpleName()); region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd); + rss = mock(RegionServerServices.class); + List regions = new ArrayList(); + regions.add(region); + when(rss.getOnlineRegions()).thenReturn(regions); } @After @@ -86,7 +95,7 @@ public class TestCompactedHFilesDischarger { public void testCompactedHFilesCleaner() throws Exception { // Create the cleaner object CompactedHFilesDischarger cleaner = - new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region); + new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); // Add some data to the region and do some flushes for (int i = 1; i < 10; i++) { Put p = new Put(Bytes.toBytes("row" + i)); @@ -152,7 +161,7 @@ public class TestCompactedHFilesDischarger { public void testCleanerWithParallelScannersAfterCompaction() throws Exception { // Create the cleaner object CompactedHFilesDischarger cleaner = - new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region); + new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); // Add some data to the region and do some flushes for (int i = 1; i < 10; i++) { Put p = new Put(Bytes.toBytes("row" + i)); @@ -223,7 +232,7 @@ public class TestCompactedHFilesDischarger { public void testCleanerWithParallelScanners() throws Exception { // Create the cleaner object CompactedHFilesDischarger cleaner = - new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region); + new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); // Add some data to the region and do some flushes for (int i = 1; i < 10; i++) { Put p = new Put(Bytes.toBytes("row" + i)); diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index c61b598..661783f 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -280,7 +280,10 @@ module Hbase #---------------------------------------------------------------------------------------------- # Parse arguments and update HTableDescriptor accordingly def parse_htd_args(htd, arg) - htd.setNormalizationEnabled(JBoolean.valueOf(arg.delete(NORMALIZATION_ENABLED))) if arg[NORMALIZATION_ENABLED] + if arg.has_key?(NORMALIZATION_MODE) + mode = arg.delete(NORMALIZATION_MODE) + htd.setValue(NORMALIZATION_MODE, mode) + end end #---------------------------------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/normalize.rb b/hbase-shell/src/main/ruby/shell/commands/normalize.rb index 7e6302c..e2b3d42 100644 --- a/hbase-shell/src/main/ruby/shell/commands/normalize.rb +++ b/hbase-shell/src/main/ruby/shell/commands/normalize.rb @@ -22,7 +22,7 @@ module Shell class Normalize < Command def help return <<-EOF -Trigger region normalizer for all tables which have NORMALIZATION_ENABLED flag set. Returns true +Trigger region normalizer for all tables which have NORMALIZATION_MODE flag set. Returns true if normalizer ran successfully, false otherwise. Note that this command has no effect if region normalizer is disabled (make sure it's turned on using 'normalizer_switch' command). diff --git a/hbase-shell/src/main/ruby/shell/commands/normalizer_switch.rb b/hbase-shell/src/main/ruby/shell/commands/normalizer_switch.rb index 6d959c4..ee9e2d1 100644 --- a/hbase-shell/src/main/ruby/shell/commands/normalizer_switch.rb +++ b/hbase-shell/src/main/ruby/shell/commands/normalizer_switch.rb @@ -23,7 +23,8 @@ module Shell def help return <<-EOF Enable/Disable region normalizer. Returns previous normalizer state. -When normalizer is enabled, it handles all tables with 'NORMALIZATION_ENABLED' => true. +When normalizer is enabled, it handles all tables with 'NORMALIZATION_MODE' flag containing +types of normalization actions. Examples: hbase> normalizer_switch true diff --git a/pom.xml b/pom.xml index 897b41c..ccc7eb5 100644 --- a/pom.xml +++ b/pom.xml @@ -1826,6 +1826,9 @@ HBasePatchProcess + + 2 +