diff --git conf/hadoop-metrics2.properties conf/hadoop-metrics2.properties
new file mode 100644
index 0000000..451ec33
--- /dev/null
+++ conf/hadoop-metrics2.properties
@@ -0,0 +1,5 @@
+# syntax: [prefix].[source|sink|jmx].[instance].[options]
+# See package.html for org.apache.hadoop.metrics2 for details
+
+*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+
diff --git hbase-hadoop-compat/pom.xml hbase-hadoop-compat/pom.xml
new file mode 100644
index 0000000..2511f26
--- /dev/null
+++ hbase-hadoop-compat/pom.xml
@@ -0,0 +1,81 @@
+
+
+
+
+ 4.0.0
+
+ hbase
+ org.apache.hbase
+ 0.95-SNAPSHOT
+ ..
+
+
+ hbase-hadoop-compat
+ HBase - Hadoop Compatibility
+
+ Interfaces to be implemented in order to smooth
+ over hadoop version differences
+
+
+
+
+
+ maven-surefire-plugin
+
+
+
+ secondPartTestsExecution
+ test
+
+ test
+
+
+ true
+
+
+
+
+
+
+
+
+
+
+
diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSource.java
new file mode 100644
index 0000000..dde2928
--- /dev/null
+++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSource.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver.metrics2;
+
+/**
+ *
+ */
+public interface ReplicationMetricsSource {
+
+ public void setGauge(String gaugeName, long value);
+
+ public void incGauge(String gaugeName, long delta);
+
+ public void decGauge(String gaugeName, long delta);
+
+ public void removeGauge(String key);
+
+ public void incCounters(String key, long delta);
+
+ public void removeCounter(String key);
+}
diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceFactory.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceFactory.java
new file mode 100644
index 0000000..38d9792
--- /dev/null
+++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver.metrics2;
+
+import org.apache.xbean.finder.ResourceFinder;
+
+import java.io.IOException;
+
+
+/**
+ *
+ */
+public class ReplicationMetricsSourceFactory {
+
+ private static ReplicationMetricsSource rms = null;
+ private static ResourceFinder finder = new ResourceFinder("META-INF/services");
+
+ public static synchronized ReplicationMetricsSource getInstance() {
+ if (rms == null) {
+ try {
+ rms = (ReplicationMetricsSource) finder.findImplementation(ReplicationMetricsSource.class).newInstance();
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (InstantiationException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (IllegalAccessException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ if (rms == null) {
+ throw new RuntimeException("It appears no hadoop compatibility jar is on the classpath");
+ }
+
+ }
+ return rms;
+ }
+
+}
diff --git hbase-hadoop-compat/src/main/java/org/apache/xbean/finder/ResourceFinder.java hbase-hadoop-compat/src/main/java/org/apache/xbean/finder/ResourceFinder.java
new file mode 100644
index 0000000..e90638b
--- /dev/null
+++ hbase-hadoop-compat/src/main/java/org/apache/xbean/finder/ResourceFinder.java
@@ -0,0 +1,1089 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.xbean.finder;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.JarURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+/**
+ * This class is taken from:
+ * http://svn.apache.org/repos/asf/geronimo/xbean/tags/xbean-3.7/xbean-finder/src/main/java/org/apache/xbean/finder/ResourceFinder.java
+ *
+ * It is self contained and the jar that normally contains has lots of dependencies.
+ *
+ * author David Blevins
+ * version $Rev$ $Date$
+ */
+public class ResourceFinder {
+
+ private final URL[] urls;
+ private final String path;
+ private final ClassLoader classLoader;
+ private final List resourcesNotLoaded = new ArrayList();
+
+ public ResourceFinder(URL... urls) {
+ this(null, Thread.currentThread().getContextClassLoader(), urls);
+ }
+
+ public ResourceFinder(String path) {
+ this(path, Thread.currentThread().getContextClassLoader(), null);
+ }
+
+ public ResourceFinder(String path, URL... urls) {
+ this(path, Thread.currentThread().getContextClassLoader(), urls);
+ }
+
+ public ResourceFinder(String path, ClassLoader classLoader) {
+ this(path, classLoader, null);
+ }
+
+ public ResourceFinder(String path, ClassLoader classLoader, URL... urls) {
+ if (path == null){
+ path = "";
+ } else if (path.length() > 0 && !path.endsWith("/")) {
+ path += "/";
+ }
+ this.path = path;
+
+ if (classLoader == null) {
+ classLoader = Thread.currentThread().getContextClassLoader();
+ }
+ this.classLoader = classLoader;
+
+ for (int i = 0; urls != null && i < urls.length; i++) {
+ URL url = urls[i];
+ if (url == null || isDirectory(url) || url.getProtocol().equals("jar")) {
+ continue;
+ }
+ try {
+ urls[i] = new URL("jar", "", -1, url.toString() + "!/");
+ } catch (MalformedURLException e) {
+ }
+ }
+ this.urls = (urls == null || urls.length == 0)? null : urls;
+ }
+
+ private static boolean isDirectory(URL url) {
+ String file = url.getFile();
+ return (file.length() > 0 && file.charAt(file.length() - 1) == '/');
+ }
+
+ /**
+ * Returns a list of resources that could not be loaded in the last invoked findAvailable* or
+ * mapAvailable* methods.
+ *
+ * The list will only contain entries of resources that match the requirements
+ * of the last invoked findAvailable* or mapAvailable* methods, but were unable to be
+ * loaded and included in their results.
+ *
+ * The list returned is unmodifiable and the results of this method will change
+ * after each invocation of a findAvailable* or mapAvailable* methods.
+ *
+ * This method is not thread safe.
+ */
+ public List getResourcesNotLoaded() {
+ return Collections.unmodifiableList(resourcesNotLoaded);
+ }
+
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ //
+ // Find
+ //
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+
+ public URL find(String uri) throws IOException {
+ String fullUri = path + uri;
+
+ URL resource = getResource(fullUri);
+ if (resource == null) {
+ throw new IOException("Could not find resource '" + fullUri + "'");
+ }
+
+ return resource;
+ }
+
+ public List findAll(String uri) throws IOException {
+ String fullUri = path + uri;
+
+ Enumeration resources = getResources(fullUri);
+ List list = new ArrayList();
+ while (resources.hasMoreElements()) {
+ URL url = resources.nextElement();
+ list.add(url);
+ }
+ return list;
+ }
+
+
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ //
+ // Find String
+ //
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+
+ /**
+ * Reads the contents of the URL as a {@link String}'s and returns it.
+ *
+ * @param uri
+ * @return a stringified content of a resource
+ * @throws IOException if a resource pointed out by the uri param could not be find
+ * @see ClassLoader#getResource(String)
+ */
+ public String findString(String uri) throws IOException {
+ String fullUri = path + uri;
+
+ URL resource = getResource(fullUri);
+ if (resource == null) {
+ throw new IOException("Could not find a resource in : " + fullUri);
+ }
+
+ return readContents(resource);
+ }
+
+ /**
+ * Reads the contents of the found URLs as a list of {@link String}'s and returns them.
+ *
+ * @param uri
+ * @return a list of the content of each resource URL found
+ * @throws IOException if any of the found URLs are unable to be read.
+ */
+ public List findAllStrings(String uri) throws IOException {
+ String fulluri = path + uri;
+
+ List strings = new ArrayList();
+
+ Enumeration resources = getResources(fulluri);
+ while (resources.hasMoreElements()) {
+ URL url = resources.nextElement();
+ String string = readContents(url);
+ strings.add(string);
+ }
+ return strings;
+ }
+
+ /**
+ * Reads the contents of the found URLs as a Strings and returns them.
+ * Individual URLs that cannot be read are skipped and added to the
+ * list of 'resourcesNotLoaded'
+ *
+ * @param uri
+ * @return a list of the content of each resource URL found
+ * @throws IOException if classLoader.getResources throws an exception
+ */
+ public List findAvailableStrings(String uri) throws IOException {
+ resourcesNotLoaded.clear();
+ String fulluri = path + uri;
+
+ List strings = new ArrayList();
+
+ Enumeration resources = getResources(fulluri);
+ while (resources.hasMoreElements()) {
+ URL url = resources.nextElement();
+ try {
+ String string = readContents(url);
+ strings.add(string);
+ } catch (IOException notAvailable) {
+ resourcesNotLoaded.add(url.toExternalForm());
+ }
+ }
+ return strings;
+ }
+
+ /**
+ * Reads the contents of all non-directory URLs immediately under the specified
+ * location and returns them in a map keyed by the file name.
+ *
+ * Any URLs that cannot be read will cause an exception to be thrown.
+ *
+ * Example classpath:
+ *
+ * META-INF/serializables/one
+ * META-INF/serializables/two
+ * META-INF/serializables/three
+ * META-INF/serializables/four/foo.txt
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * Map map = finder.mapAvailableStrings("serializables");
+ * map.contains("one"); // true
+ * map.contains("two"); // true
+ * map.contains("three"); // true
+ * map.contains("four"); // false
+ *
+ * @param uri
+ * @return a list of the content of each resource URL found
+ * @throws IOException if any of the urls cannot be read
+ */
+ public Map mapAllStrings(String uri) throws IOException {
+ Map strings = new HashMap();
+ Map resourcesMap = getResourcesMap(uri);
+ for (Iterator iterator = resourcesMap.entrySet().iterator(); iterator.hasNext();) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String name = (String) entry.getKey();
+ URL url = (URL) entry.getValue();
+ String value = readContents(url);
+ strings.put(name, value);
+ }
+ return strings;
+ }
+
+ /**
+ * Reads the contents of all non-directory URLs immediately under the specified
+ * location and returns them in a map keyed by the file name.
+ *
+ * Individual URLs that cannot be read are skipped and added to the
+ * list of 'resourcesNotLoaded'
+ *
+ * Example classpath:
+ *
+ * META-INF/serializables/one
+ * META-INF/serializables/two # not readable
+ * META-INF/serializables/three
+ * META-INF/serializables/four/foo.txt
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * Map map = finder.mapAvailableStrings("serializables");
+ * map.contains("one"); // true
+ * map.contains("two"); // false
+ * map.contains("three"); // true
+ * map.contains("four"); // false
+ *
+ * @param uri
+ * @return a list of the content of each resource URL found
+ * @throws IOException if classLoader.getResources throws an exception
+ */
+ public Map mapAvailableStrings(String uri) throws IOException {
+ resourcesNotLoaded.clear();
+ Map strings = new HashMap();
+ Map resourcesMap = getResourcesMap(uri);
+ for (Iterator iterator = resourcesMap.entrySet().iterator(); iterator.hasNext();) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String name = (String) entry.getKey();
+ URL url = (URL) entry.getValue();
+ try {
+ String value = readContents(url);
+ strings.put(name, value);
+ } catch (IOException notAvailable) {
+ resourcesNotLoaded.add(url.toExternalForm());
+ }
+ }
+ return strings;
+ }
+
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ //
+ // Find Class
+ //
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+
+ /**
+ * Executes {@link #findString(String)} assuming the contents URL found is the name of
+ * a class that should be loaded and returned.
+ *
+ * @param uri
+ * @return
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public Class findClass(String uri) throws IOException, ClassNotFoundException {
+ String className = findString(uri);
+ return (Class) classLoader.loadClass(className);
+ }
+
+ /**
+ * Executes findAllStrings assuming the strings are
+ * the names of a classes that should be loaded and returned.
+ *
+ * Any URL or class that cannot be loaded will cause an exception to be thrown.
+ *
+ * @param uri
+ * @return
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public List findAllClasses(String uri) throws IOException, ClassNotFoundException {
+ List classes = new ArrayList();
+ List strings = findAllStrings(uri);
+ for (String className : strings) {
+ Class clazz = classLoader.loadClass(className);
+ classes.add(clazz);
+ }
+ return classes;
+ }
+
+ /**
+ * Executes findAvailableStrings assuming the strings are
+ * the names of a classes that should be loaded and returned.
+ *
+ * Any class that cannot be loaded will be skipped and placed in the
+ * 'resourcesNotLoaded' collection.
+ *
+ * @param uri
+ * @return
+ * @throws IOException if classLoader.getResources throws an exception
+ */
+ public List findAvailableClasses(String uri) throws IOException {
+ resourcesNotLoaded.clear();
+ List classes = new ArrayList();
+ List strings = findAvailableStrings(uri);
+ for (String className : strings) {
+ try {
+ Class clazz = classLoader.loadClass(className);
+ classes.add(clazz);
+ } catch (Exception notAvailable) {
+ resourcesNotLoaded.add(className);
+ }
+ }
+ return classes;
+ }
+
+ /**
+ * Executes mapAllStrings assuming the value of each entry in the
+ * map is the name of a class that should be loaded.
+ *
+ * Any class that cannot be loaded will be cause an exception to be thrown.
+ *
+ * Example classpath:
+ *
+ * META-INF/xmlparsers/xerces
+ * META-INF/xmlparsers/crimson
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * Map map = finder.mapAvailableStrings("xmlparsers");
+ * map.contains("xerces"); // true
+ * map.contains("crimson"); // true
+ * Class xercesClass = map.get("xerces");
+ * Class crimsonClass = map.get("crimson");
+ *
+ * @param uri
+ * @return
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public Map mapAllClasses(String uri) throws IOException, ClassNotFoundException {
+ Map classes = new HashMap();
+ Map map = mapAllStrings(uri);
+ for (Iterator iterator = map.entrySet().iterator(); iterator.hasNext();) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String string = (String) entry.getKey();
+ String className = (String) entry.getValue();
+ Class clazz = classLoader.loadClass(className);
+ classes.put(string, clazz);
+ }
+ return classes;
+ }
+
+ /**
+ * Executes mapAvailableStrings assuming the value of each entry in the
+ * map is the name of a class that should be loaded.
+ *
+ * Any class that cannot be loaded will be skipped and placed in the
+ * 'resourcesNotLoaded' collection.
+ *
+ * Example classpath:
+ *
+ * META-INF/xmlparsers/xerces
+ * META-INF/xmlparsers/crimson
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * Map map = finder.mapAvailableStrings("xmlparsers");
+ * map.contains("xerces"); // true
+ * map.contains("crimson"); // true
+ * Class xercesClass = map.get("xerces");
+ * Class crimsonClass = map.get("crimson");
+ *
+ * @param uri
+ * @return
+ * @throws IOException if classLoader.getResources throws an exception
+ */
+ public Map mapAvailableClasses(String uri) throws IOException {
+ resourcesNotLoaded.clear();
+ Map classes = new HashMap();
+ Map map = mapAvailableStrings(uri);
+ for (Iterator iterator = map.entrySet().iterator(); iterator.hasNext();) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String string = (String) entry.getKey();
+ String className = (String) entry.getValue();
+ try {
+ Class clazz = classLoader.loadClass(className);
+ classes.put(string, clazz);
+ } catch (Exception notAvailable) {
+ resourcesNotLoaded.add(className);
+ }
+ }
+ return classes;
+ }
+
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ //
+ // Find Implementation
+ //
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+
+ /**
+ * Assumes the class specified points to a file in the classpath that contains
+ * the name of a class that implements or is a subclass of the specfied class.
+ *
+ * Any class that cannot be loaded will be cause an exception to be thrown.
+ *
+ * Example classpath:
+ *
+ * META-INF/java.io.InputStream # contains the classname org.acme.AcmeInputStream
+ * META-INF/java.io.OutputStream
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * Class clazz = finder.findImplementation(java.io.InputStream.class);
+ * clazz.getName(); // returns "org.acme.AcmeInputStream"
+ *
+ * @param interfase a superclass or interface
+ * @return
+ * @throws IOException if the URL cannot be read
+ * @throws ClassNotFoundException if the class found is not loadable
+ * @throws ClassCastException if the class found is not assignable to the specified superclass or interface
+ */
+ public Class findImplementation(Class interfase) throws IOException, ClassNotFoundException {
+ String className = findString(interfase.getName());
+ Class impl = classLoader.loadClass(className);
+ if (!interfase.isAssignableFrom(impl)) {
+ throw new ClassCastException("Class not of type: " + interfase.getName());
+ }
+ return impl;
+ }
+
+ /**
+ * Assumes the class specified points to a file in the classpath that contains
+ * the name of a class that implements or is a subclass of the specfied class.
+ *
+ * Any class that cannot be loaded or assigned to the specified interface will be cause
+ * an exception to be thrown.
+ *
+ * Example classpath:
+ *
+ * META-INF/java.io.InputStream # contains the classname org.acme.AcmeInputStream
+ * META-INF/java.io.InputStream # contains the classname org.widget.NeatoInputStream
+ * META-INF/java.io.InputStream # contains the classname com.foo.BarInputStream
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * List classes = finder.findAllImplementations(java.io.InputStream.class);
+ * classes.contains("org.acme.AcmeInputStream"); // true
+ * classes.contains("org.widget.NeatoInputStream"); // true
+ * classes.contains("com.foo.BarInputStream"); // true
+ *
+ * @param interfase a superclass or interface
+ * @return
+ * @throws IOException if the URL cannot be read
+ * @throws ClassNotFoundException if the class found is not loadable
+ * @throws ClassCastException if the class found is not assignable to the specified superclass or interface
+ */
+ public List findAllImplementations(Class interfase) throws IOException, ClassNotFoundException {
+ List implementations = new ArrayList();
+ List strings = findAllStrings(interfase.getName());
+ for (String className : strings) {
+ Class impl = classLoader.loadClass(className);
+ if (!interfase.isAssignableFrom(impl)) {
+ throw new ClassCastException("Class not of type: " + interfase.getName());
+ }
+ implementations.add(impl);
+ }
+ return implementations;
+ }
+
+ /**
+ * Assumes the class specified points to a file in the classpath that contains
+ * the name of a class that implements or is a subclass of the specfied class.
+ *
+ * Any class that cannot be loaded or are not assignable to the specified class will be
+ * skipped and placed in the 'resourcesNotLoaded' collection.
+ *
+ * Example classpath:
+ *
+ * META-INF/java.io.InputStream # contains the classname org.acme.AcmeInputStream
+ * META-INF/java.io.InputStream # contains the classname org.widget.NeatoInputStream
+ * META-INF/java.io.InputStream # contains the classname com.foo.BarInputStream
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * List classes = finder.findAllImplementations(java.io.InputStream.class);
+ * classes.contains("org.acme.AcmeInputStream"); // true
+ * classes.contains("org.widget.NeatoInputStream"); // true
+ * classes.contains("com.foo.BarInputStream"); // true
+ *
+ * @param interfase a superclass or interface
+ * @return
+ * @throws IOException if classLoader.getResources throws an exception
+ */
+ public List findAvailableImplementations(Class interfase) throws IOException {
+ resourcesNotLoaded.clear();
+ List implementations = new ArrayList();
+ List strings = findAvailableStrings(interfase.getName());
+ for (String className : strings) {
+ try {
+ Class impl = classLoader.loadClass(className);
+ if (interfase.isAssignableFrom(impl)) {
+ implementations.add(impl);
+ } else {
+ resourcesNotLoaded.add(className);
+ }
+ } catch (Exception notAvailable) {
+ resourcesNotLoaded.add(className);
+ }
+ }
+ return implementations;
+ }
+
+ /**
+ * Assumes the class specified points to a directory in the classpath that holds files
+ * containing the name of a class that implements or is a subclass of the specfied class.
+ *
+ * Any class that cannot be loaded or assigned to the specified interface will be cause
+ * an exception to be thrown.
+ *
+ * Example classpath:
+ *
+ * META-INF/java.net.URLStreamHandler/jar
+ * META-INF/java.net.URLStreamHandler/file
+ * META-INF/java.net.URLStreamHandler/http
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * Map map = finder.mapAllImplementations(java.net.URLStreamHandler.class);
+ * Class jarUrlHandler = map.get("jar");
+ * Class fileUrlHandler = map.get("file");
+ * Class httpUrlHandler = map.get("http");
+ *
+ * @param interfase a superclass or interface
+ * @return
+ * @throws IOException if the URL cannot be read
+ * @throws ClassNotFoundException if the class found is not loadable
+ * @throws ClassCastException if the class found is not assignable to the specified superclass or interface
+ */
+ public Map mapAllImplementations(Class interfase) throws IOException, ClassNotFoundException {
+ Map implementations = new HashMap();
+ Map map = mapAllStrings(interfase.getName());
+ for (Iterator iterator = map.entrySet().iterator(); iterator.hasNext();) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String string = (String) entry.getKey();
+ String className = (String) entry.getValue();
+ Class impl = classLoader.loadClass(className);
+ if (!interfase.isAssignableFrom(impl)) {
+ throw new ClassCastException("Class not of type: " + interfase.getName());
+ }
+ implementations.put(string, impl);
+ }
+ return implementations;
+ }
+
+ /**
+ * Assumes the class specified points to a directory in the classpath that holds files
+ * containing the name of a class that implements or is a subclass of the specfied class.
+ *
+ * Any class that cannot be loaded or are not assignable to the specified class will be
+ * skipped and placed in the 'resourcesNotLoaded' collection.
+ *
+ * Example classpath:
+ *
+ * META-INF/java.net.URLStreamHandler/jar
+ * META-INF/java.net.URLStreamHandler/file
+ * META-INF/java.net.URLStreamHandler/http
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * Map map = finder.mapAllImplementations(java.net.URLStreamHandler.class);
+ * Class jarUrlHandler = map.get("jar");
+ * Class fileUrlHandler = map.get("file");
+ * Class httpUrlHandler = map.get("http");
+ *
+ * @param interfase a superclass or interface
+ * @return
+ * @throws IOException if classLoader.getResources throws an exception
+ */
+ public Map mapAvailableImplementations(Class interfase) throws IOException {
+ resourcesNotLoaded.clear();
+ Map implementations = new HashMap();
+ Map map = mapAvailableStrings(interfase.getName());
+ for (Iterator iterator = map.entrySet().iterator(); iterator.hasNext();) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String string = (String) entry.getKey();
+ String className = (String) entry.getValue();
+ try {
+ Class impl = classLoader.loadClass(className);
+ if (interfase.isAssignableFrom(impl)) {
+ implementations.put(string, impl);
+ } else {
+ resourcesNotLoaded.add(className);
+ }
+ } catch (Exception notAvailable) {
+ resourcesNotLoaded.add(className);
+ }
+ }
+ return implementations;
+ }
+
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ //
+ // Find Properties
+ //
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+
+ /**
+ * Finds the corresponding resource and reads it in as a properties file
+ *
+ * Example classpath:
+ *
+ * META-INF/widget.properties
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * Properties widgetProps = finder.findProperties("widget.properties");
+ *
+ * @param uri
+ * @return
+ * @throws IOException if the URL cannot be read or is not in properties file format
+ */
+ public Properties findProperties(String uri) throws IOException {
+ String fulluri = path + uri;
+
+ URL resource = getResource(fulluri);
+ if (resource == null) {
+ throw new IOException("Could not find resource: " + fulluri);
+ }
+
+ return loadProperties(resource);
+ }
+
+ /**
+ * Finds the corresponding resources and reads them in as a properties files
+ *
+ * Any URL that cannot be read in as a properties file will cause an exception to be thrown.
+ *
+ * Example classpath:
+ *
+ * META-INF/app.properties
+ * META-INF/app.properties
+ * META-INF/app.properties
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * List appProps = finder.findAllProperties("app.properties");
+ *
+ * @param uri
+ * @return
+ * @throws IOException if the URL cannot be read or is not in properties file format
+ */
+ public List findAllProperties(String uri) throws IOException {
+ String fulluri = path + uri;
+
+ List properties = new ArrayList();
+
+ Enumeration resources = getResources(fulluri);
+ while (resources.hasMoreElements()) {
+ URL url = resources.nextElement();
+ Properties props = loadProperties(url);
+ properties.add(props);
+ }
+ return properties;
+ }
+
+ /**
+ * Finds the corresponding resources and reads them in as a properties files
+ *
+ * Any URL that cannot be read in as a properties file will be added to the
+ * 'resourcesNotLoaded' collection.
+ *
+ * Example classpath:
+ *
+ * META-INF/app.properties
+ * META-INF/app.properties
+ * META-INF/app.properties
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * List appProps = finder.findAvailableProperties("app.properties");
+ *
+ * @param uri
+ * @return
+ * @throws IOException if classLoader.getResources throws an exception
+ */
+ public List findAvailableProperties(String uri) throws IOException {
+ resourcesNotLoaded.clear();
+ String fulluri = path + uri;
+
+ List properties = new ArrayList();
+
+ Enumeration resources = getResources(fulluri);
+ while (resources.hasMoreElements()) {
+ URL url = resources.nextElement();
+ try {
+ Properties props = loadProperties(url);
+ properties.add(props);
+ } catch (Exception notAvailable) {
+ resourcesNotLoaded.add(url.toExternalForm());
+ }
+ }
+ return properties;
+ }
+
+ /**
+ * Finds the corresponding resources and reads them in as a properties files
+ *
+ * Any URL that cannot be read in as a properties file will cause an exception to be thrown.
+ *
+ * Example classpath:
+ *
+ * META-INF/jdbcDrivers/oracle.properties
+ * META-INF/jdbcDrivers/mysql.props
+ * META-INF/jdbcDrivers/derby
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * List driversList = finder.findAvailableProperties("jdbcDrivers");
+ * Properties oracleProps = driversList.get("oracle.properties");
+ * Properties mysqlProps = driversList.get("mysql.props");
+ * Properties derbyProps = driversList.get("derby");
+ *
+ * @param uri
+ * @return
+ * @throws IOException if the URL cannot be read or is not in properties file format
+ */
+ public Map mapAllProperties(String uri) throws IOException {
+ Map propertiesMap = new HashMap();
+ Map map = getResourcesMap(uri);
+ for (Iterator iterator = map.entrySet().iterator(); iterator.hasNext();) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String string = (String) entry.getKey();
+ URL url = (URL) entry.getValue();
+ Properties properties = loadProperties(url);
+ propertiesMap.put(string, properties);
+ }
+ return propertiesMap;
+ }
+
+ /**
+ * Finds the corresponding resources and reads them in as a properties files
+ *
+ * Any URL that cannot be read in as a properties file will be added to the
+ * 'resourcesNotLoaded' collection.
+ *
+ * Example classpath:
+ *
+ * META-INF/jdbcDrivers/oracle.properties
+ * META-INF/jdbcDrivers/mysql.props
+ * META-INF/jdbcDrivers/derby
+ *
+ * ResourceFinder finder = new ResourceFinder("META-INF/");
+ * List driversList = finder.findAvailableProperties("jdbcDrivers");
+ * Properties oracleProps = driversList.get("oracle.properties");
+ * Properties mysqlProps = driversList.get("mysql.props");
+ * Properties derbyProps = driversList.get("derby");
+ *
+ * @param uri
+ * @return
+ * @throws IOException if classLoader.getResources throws an exception
+ */
+ public Map mapAvailableProperties(String uri) throws IOException {
+ resourcesNotLoaded.clear();
+ Map propertiesMap = new HashMap();
+ Map map = getResourcesMap(uri);
+ for (Iterator iterator = map.entrySet().iterator(); iterator.hasNext();) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String string = (String) entry.getKey();
+ URL url = (URL) entry.getValue();
+ try {
+ Properties properties = loadProperties(url);
+ propertiesMap.put(string, properties);
+ } catch (Exception notAvailable) {
+ resourcesNotLoaded.add(url.toExternalForm());
+ }
+ }
+ return propertiesMap;
+ }
+
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ //
+ // Map Resources
+ //
+ // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+
+ public Map getResourcesMap(String uri) throws IOException {
+ String basePath = path + uri;
+
+ Map resources = new HashMap();
+ if (!basePath.endsWith("/")) {
+ basePath += "/";
+ }
+ Enumeration urls = getResources(basePath);
+
+ while (urls.hasMoreElements()) {
+ URL location = urls.nextElement();
+
+ try {
+ if (location.getProtocol().equals("jar")) {
+
+ readJarEntries(location, basePath, resources);
+
+ } else if (location.getProtocol().equals("file")) {
+
+ readDirectoryEntries(location, resources);
+
+ }
+ } catch (Exception e) {
+ }
+ }
+
+ return resources;
+ }
+
+ private static void readDirectoryEntries(URL location, Map resources) throws MalformedURLException {
+ File dir = new File(URLDecoder.decode(location.getPath()));
+ if (dir.isDirectory()) {
+ File[] files = dir.listFiles();
+ for (File file : files) {
+ if (!file.isDirectory()) {
+ String name = file.getName();
+ URL url = file.toURI().toURL();
+ resources.put(name, url);
+ }
+ }
+ }
+ }
+
+ private static void readJarEntries(URL location, String basePath, Map resources) throws IOException {
+ JarURLConnection conn = (JarURLConnection) location.openConnection();
+ JarFile jarfile = null;
+ jarfile = conn.getJarFile();
+
+ Enumeration entries = jarfile.entries();
+ while (entries != null && entries.hasMoreElements()) {
+ JarEntry entry = entries.nextElement();
+ String name = entry.getName();
+
+ if (entry.isDirectory() || !name.startsWith(basePath) || name.length() == basePath.length()) {
+ continue;
+ }
+
+ name = name.substring(basePath.length());
+
+ if (name.contains("/")) {
+ continue;
+ }
+
+ URL resource = new URL(location, name);
+ resources.put(name, resource);
+ }
+ }
+
+ private Properties loadProperties(URL resource) throws IOException {
+ InputStream in = resource.openStream();
+
+ BufferedInputStream reader = null;
+ try {
+ reader = new BufferedInputStream(in);
+ Properties properties = new Properties();
+ properties.load(reader);
+
+ return properties;
+ } finally {
+ try {
+ in.close();
+ reader.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ private String readContents(URL resource) throws IOException {
+ InputStream in = resource.openStream();
+ BufferedInputStream reader = null;
+ StringBuffer sb = new StringBuffer();
+
+ try {
+ reader = new BufferedInputStream(in);
+
+ int b = reader.read();
+ while (b != -1) {
+ sb.append((char) b);
+ b = reader.read();
+ }
+
+ return sb.toString().trim();
+ } finally {
+ try {
+ in.close();
+ reader.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ private URL getResource(String fullUri) {
+ if (urls == null){
+ return classLoader.getResource(fullUri);
+ }
+ return findResource(fullUri, urls);
+ }
+
+ private Enumeration getResources(String fulluri) throws IOException {
+ if (urls == null) {
+ return classLoader.getResources(fulluri);
+ }
+ Vector resources = new Vector();
+ for (URL url : urls) {
+ URL resource = findResource(fulluri, url);
+ if (resource != null){
+ resources.add(resource);
+ }
+ }
+ return resources.elements();
+ }
+
+ private URL findResource(String resourceName, URL... search) {
+ for (int i = 0; i < search.length; i++) {
+ URL currentUrl = search[i];
+ if (currentUrl == null) {
+ continue;
+ }
+
+ try {
+ String protocol = currentUrl.getProtocol();
+ if (protocol.equals("jar")) {
+ /*
+ * If the connection for currentUrl or resURL is
+ * used, getJarFile() will throw an exception if the
+ * entry doesn't exist.
+ */
+ URL jarURL = ((JarURLConnection) currentUrl.openConnection()).getJarFileURL();
+ JarFile jarFile;
+ JarURLConnection juc;
+ try {
+ juc = (JarURLConnection) new URL("jar", "", jarURL.toExternalForm() + "!/").openConnection();
+ jarFile = juc.getJarFile();
+ } catch (IOException e) {
+ // Don't look for this jar file again
+ search[i] = null;
+ throw e;
+ }
+
+ try {
+ juc = (JarURLConnection) new URL("jar", "", jarURL.toExternalForm() + "!/").openConnection();
+ jarFile = juc.getJarFile();
+ String entryName;
+ if (currentUrl.getFile().endsWith("!/")) {
+ entryName = resourceName;
+ } else {
+ String file = currentUrl.getFile();
+ int sepIdx = file.lastIndexOf("!/");
+ if (sepIdx == -1) {
+ // Invalid URL, don't look here again
+ search[i] = null;
+ continue;
+ }
+ sepIdx += 2;
+ StringBuffer sb = new StringBuffer(file.length() - sepIdx + resourceName.length());
+ sb.append(file.substring(sepIdx));
+ sb.append(resourceName);
+ entryName = sb.toString();
+ }
+ if (entryName.equals("META-INF/") && jarFile.getEntry("META-INF/MANIFEST.MF") != null) {
+ return targetURL(currentUrl, "META-INF/MANIFEST.MF");
+ }
+ if (jarFile.getEntry(entryName) != null) {
+ return targetURL(currentUrl, resourceName);
+ }
+ } finally {
+ if (!juc.getUseCaches()) {
+ try {
+ jarFile.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ } else if (protocol.equals("file")) {
+ String baseFile = currentUrl.getFile();
+ String host = currentUrl.getHost();
+ int hostLength = 0;
+ if (host != null) {
+ hostLength = host.length();
+ }
+ StringBuffer buf = new StringBuffer(2 + hostLength + baseFile.length() + resourceName.length());
+
+ if (hostLength > 0) {
+ buf.append("//").append(host);
+ }
+ // baseFile always ends with '/'
+ buf.append(baseFile);
+ String fixedResName = resourceName;
+ // Do not create a UNC path, i.e. \\host
+ while (fixedResName.startsWith("/") || fixedResName.startsWith("\\")) {
+ fixedResName = fixedResName.substring(1);
+ }
+ buf.append(fixedResName);
+ String filename = buf.toString();
+ File file = new File(filename);
+ File file2 = new File(URLDecoder.decode(filename));
+
+ if (file.exists() || file2.exists()) {
+ return targetURL(currentUrl, fixedResName);
+ }
+ } else {
+ URL resourceURL = targetURL(currentUrl, resourceName);
+ URLConnection urlConnection = resourceURL.openConnection();
+
+ try {
+ urlConnection.getInputStream().close();
+ } catch (SecurityException e) {
+ return null;
+ }
+ // HTTP can return a stream on a non-existent file
+ // So check for the return code;
+ if (!resourceURL.getProtocol().equals("http")) {
+ return resourceURL;
+ }
+
+ int code = ((HttpURLConnection) urlConnection).getResponseCode();
+ if (code >= 200 && code < 300) {
+ return resourceURL;
+ }
+ }
+ } catch (MalformedURLException e) {
+ // Keep iterating through the URL list
+ } catch (IOException e) {
+ } catch (SecurityException e) {
+ }
+ }
+ return null;
+ }
+
+ private URL targetURL(URL base, String name) throws MalformedURLException {
+ StringBuffer sb = new StringBuffer(base.getFile().length() + name.length());
+ sb.append(base.getFile());
+ sb.append(name);
+ String file = sb.toString();
+ return new URL(base.getProtocol(), base.getHost(), base.getPort(), file, null);
+ }
+}
diff --git hbase-hadoop1-compat/pom.xml hbase-hadoop1-compat/pom.xml
new file mode 100644
index 0000000..fcddcf2
--- /dev/null
+++ hbase-hadoop1-compat/pom.xml
@@ -0,0 +1,103 @@
+
+
+ 4.0.0
+
+ hbase
+ org.apache.hbase
+ 0.95-SNAPSHOT
+ ..
+
+
+ hbase-hadoop1-compat
+ HBase - Hadoop One Compatibility
+
+ Interfaces to be implemented in order to smooth
+ over hadoop version differences
+
+
+
+
+
+ maven-surefire-plugin
+
+
+
+ secondPartTestsExecution
+ test
+
+ test
+
+
+ true
+
+
+
+
+
+
+
+
+
+
+ org.apache.hbase
+ hbase-hadoop-compat
+
+
+ org.apache.hadoop
+ hadoop-core
+ ${hadoop-one.version}
+ true
+
+
+ hsqldb
+ hsqldb
+
+
+ net.sf.kosmosfs
+ kfs
+
+
+ org.eclipse.jdt
+ core
+
+
+ net.java.dev.jets3t
+ jets3t
+
+
+ oro
+ oro
+
+
+
+
+ org.apache.hadoop
+ hadoop-test
+ ${hadoop-one.version}
+ true
+ test
+
+
+
+
diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java
new file mode 100644
index 0000000..744f7aa
--- /dev/null
+++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver.metrics2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ *
+ */
+public class ReplicationMetricsSourceImpl implements ReplicationMetricsSource, MetricsSource {
+
+ public static final Log LOG = LogFactory.getLog(ReplicationMetricsSourceImpl.class);
+ public static final String METRICS_NAME = "ReplicationMetrics";
+ private static final String METRICS_CONTEXT = "replicationmetrics";
+
+ private ConcurrentMap
+ gauges = new ConcurrentHashMap();
+ private ConcurrentMap counters =
+ new ConcurrentHashMap();
+
+ ReplicationMetricsSourceImpl() {
+ DefaultMetricsSystem.initialize("hbase");
+ DefaultMetricsSystem.registerSource(METRICS_NAME, "Metrics about hbase replication", this);
+ }
+
+ /**
+ * Set a single gauge to a value.
+ *
+ * @param gaugeName gauge name
+ * @param value the new value of the gauge.
+ */
+ public void setGauge(String gaugeName, long value) {
+ MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, value);
+ gaugeInt.set(value);
+ }
+
+ /**
+ * Add some amount to a gauge.
+ *
+ * @param gaugeName The name of the gauge to increment.
+ * @param delta The amount to increment the gauge by.
+ */
+ public void incGauge(String gaugeName, long delta) {
+ MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l);
+ gaugeInt.incr(delta);
+ }
+
+ /**
+ * Decrease the value of a named gauge.
+ *
+ * @param gaugeName The name of the gauge.
+ * @param delta the ammount to subtract from a gauge value.
+ */
+ public void decGauge(String gaugeName, long delta) {
+ MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l);
+ gaugeInt.decr(delta);
+ }
+
+ /**
+ * Increment a named counter by some value.
+ *
+ * @param key the name of the counter
+ * @param delta the ammount to increment
+ */
+ public void incCounters(String key, long delta) {
+ MetricMutableCounterLong counter = getLongCounter(key, 0l);
+ counter.incr(delta);
+
+ }
+
+ /**
+ * Remove a named gauge.
+ *
+ * @param key
+ */
+ public void removeGauge(String key) {
+ gauges.remove(key);
+ }
+
+ /**
+ * Remove a named counter.
+ *
+ * @param key
+ */
+ public void removeCounter(String key) {
+ counters.remove(key);
+ }
+
+ /**
+ * Method to export all the metrics.
+ *
+ * @param metricsBuilder Builder to accept metrics
+ * @param all push all or only changed?
+ */
+ @Override
+ public void getMetrics(MetricsBuilder metricsBuilder, boolean all) {
+
+ MetricsRecordBuilder rb = metricsBuilder.addRecord(METRICS_NAME).setContext(METRICS_CONTEXT);
+
+ for (Map.Entry entry : counters.entrySet()) {
+ entry.getValue().snapshot(rb, all);
+ }
+ for (Map.Entry entry : gauges.entrySet()) {
+ entry.getValue().snapshot(rb, all);
+ }
+
+ }
+
+ /**
+ * Get a MetricMutableGaugeLong from the storage. If it is not there atomically put it.
+ *
+ * @param gaugeName name of the gauge to create or get.
+ * @param potentialStartingValue value of the new counter if we have to create it.
+ * @return
+ */
+ private MetricMutableGaugeLong getLongGauge(String gaugeName, long potentialStartingValue) {
+ //Try and get the guage.
+ MetricMutableGaugeLong gaugeInt = gauges.get(gaugeName);
+
+ //If it's not there then try and put a new one in the storage.
+ if (gaugeInt == null) {
+
+ //Create the potential new gauge.
+ MetricMutableGaugeLong newGauge = new MetricMutableGaugeLong(gaugeName, "",
+ potentialStartingValue);
+
+ // Try and put the gauge in. This is atomic.
+ gaugeInt = gauges.putIfAbsent(gaugeName, newGauge);
+
+ //If the value we get back is null then the put was successful and we will return that.
+ //otherwise gaugeInt should contain the thing that was in before the put could be completed.
+ if (gaugeInt == null) {
+ gaugeInt = newGauge;
+ }
+ }
+ return gaugeInt;
+ }
+
+ /**
+ * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it.
+ *
+ * @param counterName Name of the counter to get
+ * @param potentialStartingValue starting value if we have to create a new counter
+ * @return
+ */
+ private MetricMutableCounterLong getLongCounter(String counterName, long potentialStartingValue) {
+ //See getLongGauge for description on how this works.
+ MetricMutableCounterLong counter = counters.get(counterName);
+ if (counter == null) {
+ MetricMutableCounterLong newCounter =
+ new MetricMutableCounterLong(counterName, "", potentialStartingValue);
+ counter = counters.putIfAbsent(counterName, newCounter);
+ if (counter == null) {
+ counter = newCounter;
+ }
+ }
+ return counter;
+ }
+}
diff --git hbase-hadoop1-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource hbase-hadoop1-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource
new file mode 100644
index 0000000..b49b27a
--- /dev/null
+++ hbase-hadoop1-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource
@@ -0,0 +1 @@
+org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSourceImpl
\ No newline at end of file
diff --git hbase-hadoop2-compat/pom.xml hbase-hadoop2-compat/pom.xml
new file mode 100644
index 0000000..bed591c
--- /dev/null
+++ hbase-hadoop2-compat/pom.xml
@@ -0,0 +1,105 @@
+
+
+ 4.0.0
+
+ hbase
+ org.apache.hbase
+ 0.95-SNAPSHOT
+ ..
+
+
+ hbase-hadoop2-compat
+ HBase - Hadoop Two Compatibility
+
+ Interfaces to be implemented in order to smooth
+ over hadoop version differences
+
+
+
+
+
+ maven-surefire-plugin
+
+
+
+ secondPartTestsExecution
+ test
+
+ test
+
+
+ true
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ create-mrapp-generated-classpath
+ generate-test-resources
+
+ build-classpath
+
+
+
+
+ ${project.build.directory}/test-classes/mrapp-generated-classpath
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.hbase
+ hbase-hadoop-compat
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop-two.version}
+
+
+ org.apache.hadoop
+ hadoop-annotations
+ ${hadoop-two.version}
+
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+ ${hadoop-two.version}
+
+
+
diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java
new file mode 100644
index 0000000..38d03fc
--- /dev/null
+++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver.metrics2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.*;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ *
+ */
+public class ReplicationMetricsSourceImpl implements ReplicationMetricsSource, MetricsSource {
+
+ public static final Log LOG = LogFactory.getLog(ReplicationMetricsSourceImpl.class);
+ public static final String METRICS_NAME = "ReplicationMetrics";
+ private static final String METRICS_CONTEXT = "replicationmetrics";
+
+ private ConcurrentMap
+ gauges = new ConcurrentHashMap();
+ private ConcurrentMap counters =
+ new ConcurrentHashMap();
+
+ private MetricsRegistry mr;
+
+
+ ReplicationMetricsSourceImpl() {
+ DefaultMetricsSystem.initialize("hbase");
+ DefaultMetricsSystem.instance().register(METRICS_NAME, "Metrics about hbase replication", this);
+ mr = new MetricsRegistry(METRICS_NAME);
+ }
+
+ /**
+ * Set a single gauge to a value.
+ *
+ * @param gaugeName gauge name
+ * @param value the new value of the gauge.
+ */
+ public void setGauge(String gaugeName, long value) {
+ MutableGaugeLong gaugeInt = getLongGauge(gaugeName, value);
+ gaugeInt.set(value);
+ }
+
+ /**
+ * Add some amount to a gauge.
+ *
+ * @param gaugeName The name of the gauge to increment.
+ * @param delta The amount to increment the gauge by.
+ */
+ public void incGauge(String gaugeName, long delta) {
+ MutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l);
+ gaugeInt.incr(delta);
+ }
+
+ /**
+ * Decrease the value of a named gauge.
+ *
+ * @param gaugeName The name of the gauge.
+ * @param delta the ammount to subtract from a gauge value.
+ */
+ public void decGauge(String gaugeName, long delta) {
+ MutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l);
+ gaugeInt.decr(delta);
+ }
+
+ /**
+ * Increment a named counter by some value.
+ *
+ * @param key the name of the counter
+ * @param delta the ammount to increment
+ */
+ public void incCounters(String key, long delta) {
+ MutableCounterLong counter = getLongCounter(key, 0l);
+ counter.incr(delta);
+
+ }
+
+ /**
+ * Remove a named gauge.
+ *
+ * @param key
+ */
+ public void removeGauge(String key) {
+ gauges.remove(key);
+ }
+
+ /**
+ * Remove a named counter.
+ *
+ * @param key
+ */
+ public void removeCounter(String key) {
+ counters.remove(key);
+ }
+
+
+
+ @Override
+ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
+ MetricsRecordBuilder rb = metricsCollector.addRecord(METRICS_NAME).setContext(METRICS_CONTEXT);
+
+ for (Map.Entry entry : counters.entrySet()) {
+ entry.getValue().snapshot(rb, all);
+ }
+ for (Map.Entry entry : gauges.entrySet()) {
+ entry.getValue().snapshot(rb, all);
+ }
+
+ }
+
+
+ /**
+ * Get a MetricMutableGaugeLong from the storage. If it is not there atomically put it.
+ *
+ * @param gaugeName name of the gauge to create or get.
+ * @param potentialStartingValue value of the new counter if we have to create it.
+ * @return
+ */
+ private MutableGaugeLong getLongGauge(String gaugeName, long potentialStartingValue) {
+ //Try and get the guage.
+ MutableGaugeLong gaugeInt = gauges.get(gaugeName);
+
+ //If it's not there then try and put a new one in the storage.
+ if (gaugeInt == null) {
+
+ //Create the potential new gauge.
+ MutableGaugeLong newGauge = mr.newGauge(gaugeName, "",
+ potentialStartingValue);
+
+ // Try and put the gauge in. This is atomic.
+ gaugeInt = gauges.putIfAbsent(gaugeName, newGauge);
+
+ //If the value we get back is null then the put was successful and we will return that.
+ //otherwise gaugeInt should contain the thing that was in before the put could be completed.
+ if (gaugeInt == null) {
+ gaugeInt = newGauge;
+ }
+ }
+ return gaugeInt;
+ }
+
+ /**
+ * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it.
+ *
+ * @param counterName Name of the counter to get
+ * @param potentialStartingValue starting value if we have to create a new counter
+ * @return
+ */
+ private MutableCounterLong getLongCounter(String counterName, long potentialStartingValue) {
+ //See getLongGauge for description on how this works.
+ MutableCounterLong counter = counters.get(counterName);
+ if (counter == null) {
+ MutableCounterLong newCounter =
+ mr.newCounter(counterName, "", potentialStartingValue);
+ counter = counters.putIfAbsent(counterName, newCounter);
+ if (counter == null) {
+ counter = newCounter;
+ }
+ }
+ return counter;
+ }
+}
diff --git hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource
new file mode 100644
index 0000000..b49b27a
--- /dev/null
+++ hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource
@@ -0,0 +1 @@
+org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSourceImpl
\ No newline at end of file
diff --git hbase-server/pom.xml hbase-server/pom.xml
index 4359d9c..850aa03 100644
--- hbase-server/pom.xml
+++ hbase-server/pom.xml
@@ -288,6 +288,15 @@
org.apache.hbase
hbase-common
+
+ org.apache.hbase
+ hbase-hadoop-compat
+
+
+ org.apache.hbase
+ ${compat.module}
+ ${project.version}
+
io.netty
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 0b6987e..3016e07 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -19,13 +19,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -39,8 +32,16 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationSinkMetrics;
import org.apache.hadoop.hbase.util.Bytes;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
/**
* This class is responsible for replicating the edits coming
* from another cluster.
@@ -133,7 +134,7 @@ public class ReplicationSink {
}
this.metrics.setAgeOfLastAppliedOp(
entries[entries.length-1].getKey().getWriteTime());
- this.metrics.appliedBatchesRate.inc(1);
+ this.metrics.applyBatch(entries.length);
LOG.info("Total replicated: " + totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
@@ -173,7 +174,6 @@ public class ReplicationSink {
try {
table = this.pool.getTable(tableName);
table.batch(rows);
- this.metrics.appliedOpsRate.inc(rows.size());
} catch (InterruptedException ix) {
throw new IOException(ix);
} finally {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java
deleted file mode 100644
index bf324e2..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.metrics.MetricsRate;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.metrics.util.MetricsIntValue;
-import org.apache.hadoop.metrics.util.MetricsLongValue;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-
-/**
- * This class is for maintaining the various replication statistics
- * for a sink and publishing them through the metrics interfaces.
- */
-@InterfaceAudience.Private
-public class ReplicationSinkMetrics implements Updater {
- private final MetricsRecord metricsRecord;
- private MetricsRegistry registry = new MetricsRegistry();
-
- /** Rate of operations applied by the sink */
- public final MetricsRate appliedOpsRate =
- new MetricsRate("appliedOpsRate", registry);
-
- /** Rate of batches (of operations) applied by the sink */
- public final MetricsRate appliedBatchesRate =
- new MetricsRate("appliedBatchesRate", registry);
-
- /** Age of the last operation that was applied by the sink */
- private final MetricsLongValue ageOfLastAppliedOp =
- new MetricsLongValue("ageOfLastAppliedOp", registry);
-
- /**
- * Constructor used to register the metrics
- */
- public ReplicationSinkMetrics() {
- MetricsContext context = MetricsUtil.getContext("hbase");
- String name = Thread.currentThread().getName();
- metricsRecord = MetricsUtil.createRecord(context, "replication");
- metricsRecord.setTag("RegionServer", name);
- context.registerUpdater(this);
- // export for JMX
- new ReplicationStatistics(this.registry, "ReplicationSink");
- }
-
- /**
- * Set the age of the last edit that was applied
- * @param timestamp write time of the edit
- */
- public void setAgeOfLastAppliedOp(long timestamp) {
- ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp);
- }
- @Override
- public void doUpdates(MetricsContext metricsContext) {
- synchronized (this) {
- this.appliedOpsRate.pushMetric(this.metricsRecord);
- this.appliedBatchesRate.pushMetric(this.metricsRecord);
- this.ageOfLastAppliedOp.pushMetric(this.metricsRecord);
- }
- this.metricsRecord.update();
- }
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2c328ea..28b997d 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationSourceMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -238,7 +239,7 @@ public class ReplicationSource extends Thread
@Override
public void enqueueLog(Path log) {
this.queue.put(log);
- this.metrics.sizeOfLogQueue.set(queue.size());
+ this.metrics.setSizeOfLogQueue(queue.size());
}
@Override
@@ -246,6 +247,7 @@ public class ReplicationSource extends Thread
connectToPeers();
// We were stopped while looping to connect to sinks, just abort
if (!this.isActive()) {
+ metrics.clear();
return;
}
// delay this until we are in an asynchronous thread
@@ -376,6 +378,7 @@ public class ReplicationSource extends Thread
}
}
LOG.debug("Source exiting " + peerId);
+ metrics.clear();
}
/**
@@ -393,7 +396,7 @@ public class ReplicationSource extends Thread
HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
while (entry != null) {
WALEdit edit = entry.getEdit();
- this.metrics.logEditsReadRate.inc(1);
+ this.metrics.incrLogEditsRead();
seenEntries++;
// Remove all KVs that should not be replicated
HLogKey logKey = entry.getKey();
@@ -415,7 +418,7 @@ public class ReplicationSource extends Thread
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
} else {
- this.metrics.logEditsFilteredRate.inc(1);
+ this.metrics.incrLogEditsFiltered();
}
}
// Stop if too many entries or too big
@@ -455,7 +458,7 @@ public class ReplicationSource extends Thread
try {
if (this.currentPath == null) {
this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
- this.metrics.sizeOfLogQueue.set(queue.size());
+ this.metrics.setSizeOfLogQueue(queue.size());
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while reading edits", e);
@@ -616,9 +619,7 @@ public class ReplicationSource extends Thread
this.lastLoggedPosition = this.position;
}
this.totalReplicatedEdits += currentNbEntries;
- this.metrics.shippedBatchesRate.inc(1);
- this.metrics.shippedOpsRate.inc(
- this.currentNbOperations);
+ this.metrics.shipBatch(this.currentNbOperations);
this.metrics.setAgeOfLastShippedOp(
this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java
deleted file mode 100644
index 543e15d..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.metrics.MetricsRate;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.metrics.util.MetricsIntValue;
-import org.apache.hadoop.metrics.util.MetricsLongValue;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-
-/**
- * This class is for maintaining the various replication statistics
- * for a source and publishing them through the metrics interfaces.
- */
-@InterfaceAudience.Private
-public class ReplicationSourceMetrics implements Updater {
- private final MetricsRecord metricsRecord;
- private MetricsRegistry registry = new MetricsRegistry();
-
- /** Rate of shipped operations by the source */
- public final MetricsRate shippedOpsRate =
- new MetricsRate("shippedOpsRate", registry);
-
- /** Rate of shipped batches by the source */
- public final MetricsRate shippedBatchesRate =
- new MetricsRate("shippedBatchesRate", registry);
-
- /** Rate of log entries (can be multiple Puts) read from the logs */
- public final MetricsRate logEditsReadRate =
- new MetricsRate("logEditsReadRate", registry);
-
- /** Rate of log entries filtered by the source */
- public final MetricsRate logEditsFilteredRate =
- new MetricsRate("logEditsFilteredRate", registry);
-
- /** Age of the last operation that was shipped by the source */
- private final MetricsLongValue ageOfLastShippedOp =
- new MetricsLongValue("ageOfLastShippedOp", registry);
-
- /**
- * Current size of the queue of logs to replicate,
- * excluding the one being processed at the moment
- */
- public final MetricsIntValue sizeOfLogQueue =
- new MetricsIntValue("sizeOfLogQueue", registry);
-
- // It's a little dirty to preset the age to now since if we fail
- // to replicate the very first time then it will show that age instead
- // of nothing (although that might not be good either).
- private long lastTimestampForAge = System.currentTimeMillis();
-
- /**
- * Constructor used to register the metrics
- * @param id Name of the source this class is monitoring
- */
- public ReplicationSourceMetrics(String id) {
- MetricsContext context = MetricsUtil.getContext("hbase");
- String name = Thread.currentThread().getName();
- metricsRecord = MetricsUtil.createRecord(context, "replication");
- metricsRecord.setTag("RegionServer", name);
- context.registerUpdater(this);
- try {
- id = URLEncoder.encode(id, "UTF8");
- } catch (UnsupportedEncodingException e) {
- id = "CAN'T ENCODE UTF8";
- }
- // export for JMX
- new ReplicationStatistics(this.registry, "ReplicationSource for " + id);
- }
-
- /**
- * Set the age of the last edit that was shipped
- * @param timestamp write time of the edit
- */
- public void setAgeOfLastShippedOp(long timestamp) {
- lastTimestampForAge = timestamp;
- ageOfLastShippedOp.set(System.currentTimeMillis() - lastTimestampForAge);
- }
-
- /**
- * Convenience method to use the last given timestamp to refresh the age
- * of the last edit. Used when replication fails and need to keep that
- * metric accurate.
- */
- public void refreshAgeOfLastShippedOp() {
- setAgeOfLastShippedOp(lastTimestampForAge);
- }
-
- @Override
- public void doUpdates(MetricsContext metricsContext) {
- synchronized (this) {
- this.shippedOpsRate.pushMetric(this.metricsRecord);
- this.shippedBatchesRate.pushMetric(this.metricsRecord);
- this.logEditsReadRate.pushMetric(this.metricsRecord);
- this.logEditsFilteredRate.pushMetric(this.metricsRecord);
- this.ageOfLastShippedOp.pushMetric(this.metricsRecord);
- this.sizeOfLogQueue.pushMetric(this.metricsRecord);
- }
- this.metricsRecord.update();
- }
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java
deleted file mode 100644
index ceeff5f..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.metrics.MetricsMBeanBase;
-import org.apache.hadoop.metrics.util.MBeanUtil;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-
-import javax.management.ObjectName;
-
-/**
- * Exports metrics recorded by {@link ReplicationSourceMetrics} as an MBean
- * for JMX monitoring.
- */
-@InterfaceAudience.Private
-public class ReplicationStatistics extends MetricsMBeanBase {
-
- private final ObjectName mbeanName;
-
- /**
- * Constructor to register the MBean
- * @param registry which rehistry to use
- * @param name name to get to this bean
- */
- public ReplicationStatistics(MetricsRegistry registry, String name) {
- super(registry, name);
- mbeanName = MBeanUtil.registerMBean("Replication", name, this);
- }
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSinkMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSinkMetrics.java
new file mode 100644
index 0000000..8f9b9a5
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSinkMetrics.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver.metrics2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * This class is for maintaining the various replication statistics for a sink and publishing them
+ * through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+public class ReplicationSinkMetrics {
+
+ public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
+ public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
+ public static final String SINK_APPLIED_OPS = "sink.appliedOps";
+
+ private ReplicationMetricsSource rms;
+
+ public ReplicationSinkMetrics() {
+ rms = ReplicationMetricsSourceFactory.getInstance();
+ }
+
+ /**
+ * Set the age of the last applied operation
+ *
+ * @param timestamp The timestamp of the last operation applied.
+ */
+ public void setAgeOfLastAppliedOp(long timestamp) {
+ long age = System.currentTimeMillis() - timestamp;
+ rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age);
+ }
+
+ /**
+ * Convience method to change metrics when a batch of operations are applied.
+ *
+ * @param batchSize
+ */
+ public void applyBatch(long batchSize) {
+ rms.incCounters(SINK_APPLIED_BATCHES, 1);
+ rms.incCounters(SINK_APPLIED_OPS, batchSize);
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSourceMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSourceMetrics.java
new file mode 100644
index 0000000..85d6057
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSourceMetrics.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver.metrics2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.ServiceLoader;
+
+/**
+ * This class is for maintaining the various replication statistics for a source and publishing them
+ * through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceMetrics {
+
+ public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
+ public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
+ public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead";
+ public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
+ public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
+ public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
+
+ public static final Log LOG = LogFactory.getLog(ReplicationSourceMetrics.class);
+ private String id;
+
+ private long lastTimestamp = 0;
+ private int lastQueueSize = 0;
+
+ private String sizeOfLogQueKey;
+ private String ageOfLastShippedOpKey;
+ private String logEditsReadKey;
+ private String logEditsFilteredKey;
+ private final String shippedBatchesKey;
+ private final String shippedOpsKey;
+
+ private ReplicationMetricsSource rms;
+
+ /**
+ * Constructor used to register the metrics
+ *
+ * @param id Name of the source this class is monitoring
+ */
+ public ReplicationSourceMetrics(String id) {
+ this.id = id;
+
+ sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue";
+ ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+ logEditsReadKey = "source." + id + ".logEditsRead";
+ logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+ shippedBatchesKey = "source." + this.id + ".shippedBatches";
+ shippedOpsKey = "source." + this.id + ".shippedOps";
+ rms = ReplicationMetricsSourceFactory.getInstance();
+ }
+
+ /**
+ * Set the age of the last edit that was shipped
+ *
+ * @param timestamp write time of the edit
+ */
+ public void setAgeOfLastShippedOp(long timestamp) {
+ long age = System.currentTimeMillis() - timestamp;
+ rms.setGauge(ageOfLastShippedOpKey, age);
+ rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age);
+ this.lastTimestamp = timestamp;
+ }
+
+ /**
+ * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
+ * when replication fails and need to keep that metric accurate.
+ */
+ public void refreshAgeOfLastShippedOp() {
+ if (this.lastTimestamp > 0) {
+ setAgeOfLastShippedOp(this.lastTimestamp);
+ }
+ }
+
+ /**
+ * Set the size of the log queue
+ *
+ * @param size the size.
+ */
+ public void setSizeOfLogQueue(int size) {
+ rms.setGauge(sizeOfLogQueKey, size);
+ rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize);
+ lastQueueSize = size;
+ }
+
+ /**
+ * Add on the the number of log edits read
+ *
+ * @param delta the number of log edits read.
+ */
+ private void incrLogEditsRead(long delta) {
+ rms.incCounters(logEditsReadKey, delta);
+ rms.incCounters(SOURCE_LOG_EDITS_READ, delta);
+ }
+
+ /** Increment the number of log edits read by one. */
+ public void incrLogEditsRead() {
+ incrLogEditsRead(1);
+ }
+
+ /**
+ * Add on the number of log edits filtered
+ *
+ * @param delta the number filtered.
+ */
+ private void incrLogEditsFiltered(long delta) {
+ rms.incCounters(logEditsFilteredKey, delta);
+ rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta);
+ }
+
+ /** The number of log edits filtered out. */
+ public void incrLogEditsFiltered() {
+ incrLogEditsFiltered(1);
+ }
+
+ /**
+ * Convience method to apply changes to metrics do to shipping a batch of logs.
+ *
+ * @param batchSize the size of the batch that was shipped to sinks.
+ */
+ public void shipBatch(long batchSize) {
+ rms.incCounters(shippedBatchesKey, 1);
+ rms.incCounters(SOURCE_SHIPPED_BATCHES, 1);
+ rms.incCounters(shippedOpsKey, batchSize);
+ rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
+ }
+
+ /** Removes all metrics about this Source. */
+ public void clear() {
+ rms.removeGauge(sizeOfLogQueKey);
+ rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize);
+ lastQueueSize = 0;
+ rms.removeGauge(ageOfLastShippedOpKey);
+
+ rms.removeCounter(logEditsFilteredKey);
+ rms.removeCounter(logEditsReadKey);
+
+ }
+}
diff --git pom.xml pom.xml
index 2cd0efc..c95762d 100644
--- pom.xml
+++ pom.xml
@@ -41,6 +41,9 @@
http://hbase.apache.org
hbase-server
+ hbase-hadoop2-compat
+ hbase-hadoop1-compat
+ hbase-hadoop-compat
hbase-common
hbase-it
@@ -600,7 +603,7 @@
gnu
false
- src/assembly/all.xml
+ ${assembly.file}
@@ -786,6 +789,8 @@
1.6
${project.version}
+ 2.0.0-alpha
+ 1.0.3
1.5.3
1.2
1.4
@@ -837,6 +842,7 @@
hbase-server-${project.version}-tests.jar
2.12-TRUNK-HBASE-2
surefire-junit47
+ hbase-hadoop1-compat
false
false
@@ -866,6 +872,16 @@
${project.version}
+ org.apache.hbase
+ hbase-hadoop-compat
+ ${project.version}
+
+
+ org.apache.hbase
+ ${compat.module}
+ ${project.version}
+
+
hbase-server
org.apache.hbase
${project.version}
@@ -1227,8 +1243,10 @@
- 1.0.3
+ ${hadoop-one.version}
1.4.3
+ hbase-hadoop1-compat
+ src/assembly/one.xml
@@ -1282,26 +1300,28 @@
1.6.1
- 2.0.0-alpha
+ ${hadoop-two.version}
+ hbase-hadoop2-compat
+ src/assembly/two.xml
org.apache.hadoop
hadoop-client
- ${hadoop.version}
+ ${hadoop-two.version}
org.apache.hadoop
hadoop-annotations
- ${hadoop.version}
+ ${hadoop-two.version}
org.apache.hadoop
hadoop-minicluster
- ${hadoop.version}
+ ${hadoop-two.version}
diff --git src/assembly/all.xml src/assembly/all.xml
deleted file mode 100644
index 5a9866b..0000000
--- src/assembly/all.xml
+++ /dev/null
@@ -1,153 +0,0 @@
-
-
-
-
-
- all
-
- tar.gz
-
-
-
-
- target/site
- docs
-
-
-
- /
-
- *.txt
- pom.xml
-
-
-
-
- conf
- conf
- 0644
- 0755
-
-
-
- bin
- bin
- 0755
- 0755
-
-
- conf
- conf
- 0644
- 0755
-
-
-
- src
- src
- 0644
- 0755
-
-
-
- dev-support
- dev-support
- 0755
- 0755
-
-
-
- hbase-server/src/main/ruby
- lib/ruby
- 0644
- 0755
-
-
-
- hbase-server/target/hbase-webapps
- hbase-webapps
- 0644
- 0755
-
-
-
- hbase-server/target/native
- native
- 0755
- 0755
-
- *.so
-
-
-
-
- ${parent.basedir}/hbase-server/target/
- lib
-
- ${server.test.jar}
-
- 0644
-
-
-
-
-
- true
-
-
-
-
- org.apache.hbase:hbase-*
-
-
-
-
-
-
- target/
- test/
- .classpath
- .project
- .settings/
-
-
-
-
-
-
- lib
- false
-
-
-
-
-
-
-
diff --git src/assembly/one.xml src/assembly/one.xml
new file mode 100644
index 0000000..c3b6610
--- /dev/null
+++ src/assembly/one.xml
@@ -0,0 +1,156 @@
+
+
+
+
+
+ all
+
+ tar.gz
+
+
+
+
+ target/site
+ docs
+
+
+
+ /
+
+ *.txt
+ pom.xml
+
+
+
+
+ conf
+ conf
+ 0644
+ 0755
+
+
+
+ bin
+ bin
+ 0755
+ 0755
+
+
+ conf
+ conf
+ 0644
+ 0755
+
+
+
+ src
+ src
+ 0644
+ 0755
+
+
+
+ dev-support
+ dev-support
+ 0755
+ 0755
+
+
+
+ hbase-server/src/main/ruby
+ lib/ruby
+ 0644
+ 0755
+
+
+
+ hbase-server/target/hbase-webapps
+ hbase-webapps
+ 0644
+ 0755
+
+
+
+ hbase-server/target/native
+ native
+ 0755
+ 0755
+
+ *.so
+
+
+
+
+ ${parent.basedir}/hbase-server/target/
+ lib
+
+ ${server.test.jar}
+
+ 0644
+
+
+
+
+
+ true
+
+
+
+
+ org.apache.hbase:hbase-common
+ org.apache.hbase:hbase-hadoop-compat
+ org.apache.hbase:hbase-hadoop1-compat
+ org.apache.hbase:hbase-server
+
+
+
+
+
+
+ target/
+ test/
+ .classpath
+ .project
+ .settings/
+
+
+
+
+
+
+ lib
+ false
+
+
+
+
+
+
+
diff --git src/assembly/two.xml src/assembly/two.xml
new file mode 100644
index 0000000..9f5a844
--- /dev/null
+++ src/assembly/two.xml
@@ -0,0 +1,156 @@
+
+
+
+
+
+ all
+
+ tar.gz
+
+
+
+
+ target/site
+ docs
+
+
+
+ /
+
+ *.txt
+ pom.xml
+
+
+
+
+ conf
+ conf
+ 0644
+ 0755
+
+
+
+ bin
+ bin
+ 0755
+ 0755
+
+
+ conf
+ conf
+ 0644
+ 0755
+
+
+
+ src
+ src
+ 0644
+ 0755
+
+
+
+ dev-support
+ dev-support
+ 0755
+ 0755
+
+
+
+ hbase-server/src/main/ruby
+ lib/ruby
+ 0644
+ 0755
+
+
+
+ hbase-server/target/hbase-webapps
+ hbase-webapps
+ 0644
+ 0755
+
+
+
+ hbase-server/target/native
+ native
+ 0755
+ 0755
+
+ *.so
+
+
+
+
+ ${parent.basedir}/hbase-server/target/
+ lib
+
+ ${server.test.jar}
+
+ 0644
+
+
+
+
+
+ true
+
+
+
+
+ org.apache.hbase:hbase-common
+ org.apache.hbase:hbase-hadoop-compat
+ org.apache.hbase:hbase-hadoop2-compat
+ org.apache.hbase:hbase-server
+
+
+
+
+
+
+ target/
+ test/
+ .classpath
+ .project
+ .settings/
+
+
+
+
+
+
+ lib
+ false
+
+
+
+
+
+
+