diff --git conf/hadoop-metrics2.properties conf/hadoop-metrics2.properties new file mode 100644 index 0000000..451ec33 --- /dev/null +++ conf/hadoop-metrics2.properties @@ -0,0 +1,5 @@ +# syntax: [prefix].[source|sink|jmx].[instance].[options] +# See package.html for org.apache.hadoop.metrics2 for details + +*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink + diff --git hbase-hadoop-compat/pom.xml hbase-hadoop-compat/pom.xml new file mode 100644 index 0000000..2511f26 --- /dev/null +++ hbase-hadoop-compat/pom.xml @@ -0,0 +1,81 @@ + + + + + 4.0.0 + + hbase + org.apache.hbase + 0.95-SNAPSHOT + .. + + + hbase-hadoop-compat + HBase - Hadoop Compatibility + + Interfaces to be implemented in order to smooth + over hadoop version differences + + + + + + maven-surefire-plugin + + + + secondPartTestsExecution + test + + test + + + true + + + + + + + + + + + diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSource.java new file mode 100644 index 0000000..bc509ea --- /dev/null +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSource.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver.metrics2; + +/** + * Provides access to gauges and counters. + * Implementers will hide the details of hadoop1 or hadoop2's metrics2 classes and publishing. + */ +public interface ReplicationMetricsSource { + + /** + * Set a gauge to a specific value. + * @param gaugeName the name of the gauge + * @param value the value + */ + public void setGauge(String gaugeName, long value); + + /** + * Add some amount to a gauge. + * @param gaugeName the name of the gauge + * @param delta the amount to change the gauge by. + */ + public void incGauge(String gaugeName, long delta); + + /** + * Subtract some amount to a gauge. + * @param gaugeName the name of the gauge + * @param delta the amount to change the gauge by. + */ + public void decGauge(String gaugeName, long delta); + + /** + * Remove a gauge and no longer announce it. + * @param key Name of the gauge to remove. + */ + public void removeGauge(String key); + + /** + * Add some amount to a counter. + * @param counterName the name of the counter + * @param delta the amount to change the counter by. + */ + public void incCounters(String counterName, long delta); + + /** + * Remove a counter and stop announcing it to metrics2. + * @param key + */ + public void removeCounter(String key); +} diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceFactory.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceFactory.java new file mode 100644 index 0000000..eaec7e7 --- /dev/null +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver.metrics2; + +import org.apache.xbean.finder.ResourceFinder; + +import java.io.IOException; + +/** + * Class to load ReplicationMetricsSource from the class path. + * Will only return a singleton instance. + */ +public class ReplicationMetricsSourceFactory { + + private static ReplicationMetricsSource rms = null; + private static ResourceFinder finder = new ResourceFinder("META-INF/services"); + + /** + * Get the singleton instance of ReplicationMetricsSource + * @return the singleton + */ + public static synchronized ReplicationMetricsSource getInstance() { + if (rms == null) { + try { + rms = (ReplicationMetricsSource) finder.findImplementation(ReplicationMetricsSource.class) + .newInstance(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + // If there was nothing returned and no exception then throw an exception. + if (rms == null) { + throw new RuntimeException("It appears that no hadoop " + + "compatibility jar is on the classpath"); + } + + } + return rms; + } + +} diff --git hbase-hadoop-compat/src/main/java/org/apache/xbean/finder/ResourceFinder.java hbase-hadoop-compat/src/main/java/org/apache/xbean/finder/ResourceFinder.java new file mode 100644 index 0000000..7e9278f --- /dev/null +++ hbase-hadoop-compat/src/main/java/org/apache/xbean/finder/ResourceFinder.java @@ -0,0 +1,1121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xbean.finder; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.JarURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Vector; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; + +/** + * + * This class is taken from: + * http://svn.apache.org/repos/asf/geronimo/xbean/tags/xbean-3.11.1/xbean-finder/src/main/java/org/apache/xbean/finder/ResourceFinder.java + * + * It is self contained and the jar that normally contains has lots of dependencies. + * + */ +public class ResourceFinder { + + private final URL[] urls; + private final String path; + private final ClassLoader classLoader; + private final List resourcesNotLoaded = new ArrayList(); + + public ResourceFinder(URL... urls) { + this(null, Thread.currentThread().getContextClassLoader(), urls); + } + + public ResourceFinder(String path) { + this(path, Thread.currentThread().getContextClassLoader(), null); + } + + public ResourceFinder(String path, URL... urls) { + this(path, Thread.currentThread().getContextClassLoader(), urls); + } + + public ResourceFinder(String path, ClassLoader classLoader) { + this(path, classLoader, null); + } + + public ResourceFinder(String path, ClassLoader classLoader, URL... urls) { + if (path == null){ + path = ""; + } else if (path.length() > 0 && !path.endsWith("/")) { + path += "/"; + } + this.path = path; + + if (classLoader == null) { + classLoader = Thread.currentThread().getContextClassLoader(); + } + this.classLoader = classLoader; + + for (int i = 0; urls != null && i < urls.length; i++) { + URL url = urls[i]; + if (url == null || isDirectory(url) || url.getProtocol().equals("jar")) { + continue; + } + try { + urls[i] = new URL("jar", "", -1, url.toString() + "!/"); + } catch (MalformedURLException e) { + } + } + this.urls = (urls == null || urls.length == 0)? null : urls; + } + + private static boolean isDirectory(URL url) { + String file = url.getFile(); + return (file.length() > 0 && file.charAt(file.length() - 1) == '/'); + } + + /** + * Returns a list of resources that could not be loaded in the last invoked findAvailable* or + * mapAvailable* methods. + *

+ * The list will only contain entries of resources that match the requirements + * of the last invoked findAvailable* or mapAvailable* methods, but were unable to be + * loaded and included in their results. + *

+ * The list returned is unmodifiable and the results of this method will change + * after each invocation of a findAvailable* or mapAvailable* methods. + *

+ * This method is not thread safe. + */ + public List getResourcesNotLoaded() { + return Collections.unmodifiableList(resourcesNotLoaded); + } + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + // + // Find + // + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + + public URL find(String uri) throws IOException { + String fullUri = path + uri; + + URL resource = getResource(fullUri); + if (resource == null) { + throw new IOException("Could not find resource '" + path + uri + "'"); + } + + return resource; + } + + public List findAll(String uri) throws IOException { + String fullUri = path + uri; + + Enumeration resources = getResources(fullUri); + List list = new ArrayList(); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + list.add(url); + } + return list; + } + + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + // + // Find String + // + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + + /** + * Reads the contents of the URL as a {@link String}'s and returns it. + * + * @param uri + * @return a stringified content of a resource + * @throws IOException if a resource pointed out by the uri param could not be find + * @see ClassLoader#getResource(String) + */ + public String findString(String uri) throws IOException { + String fullUri = path + uri; + + URL resource = getResource(fullUri); + if (resource == null) { + throw new IOException("Could not find a resource in : " + fullUri); + } + + return readContents(resource); + } + + /** + * Reads the contents of the found URLs as a list of {@link String}'s and returns them. + * + * @param uri + * @return a list of the content of each resource URL found + * @throws IOException if any of the found URLs are unable to be read. + */ + public List findAllStrings(String uri) throws IOException { + String fulluri = path + uri; + + List strings = new ArrayList(); + + Enumeration resources = getResources(fulluri); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + String string = readContents(url); + strings.add(string); + } + return strings; + } + + /** + * Reads the contents of the found URLs as a Strings and returns them. + * Individual URLs that cannot be read are skipped and added to the + * list of 'resourcesNotLoaded' + * + * @param uri + * @return a list of the content of each resource URL found + * @throws IOException if classLoader.getResources throws an exception + */ + public List findAvailableStrings(String uri) throws IOException { + resourcesNotLoaded.clear(); + String fulluri = path + uri; + + List strings = new ArrayList(); + + Enumeration resources = getResources(fulluri); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + try { + String string = readContents(url); + strings.add(string); + } catch (IOException notAvailable) { + resourcesNotLoaded.add(url.toExternalForm()); + } + } + return strings; + } + + /** + * Reads the contents of all non-directory URLs immediately under the specified + * location and returns them in a map keyed by the file name. + *

+ * Any URLs that cannot be read will cause an exception to be thrown. + *

+ * Example classpath: + *

+ * META-INF/serializables/one + * META-INF/serializables/two + * META-INF/serializables/three + * META-INF/serializables/four/foo.txt + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * Map map = finder.mapAvailableStrings("serializables"); + * map.contains("one"); // true + * map.contains("two"); // true + * map.contains("three"); // true + * map.contains("four"); // false + * + * @param uri + * @return a list of the content of each resource URL found + * @throws IOException if any of the urls cannot be read + */ + public Map mapAllStrings(String uri) throws IOException { + Map strings = new HashMap(); + Map resourcesMap = getResourcesMap(uri); + for (Iterator iterator = resourcesMap.entrySet().iterator(); iterator.hasNext();) { + Map.Entry entry = (Map.Entry) iterator.next(); + String name = (String) entry.getKey(); + URL url = (URL) entry.getValue(); + String value = readContents(url); + strings.put(name, value); + } + return strings; + } + + /** + * Reads the contents of all non-directory URLs immediately under the specified + * location and returns them in a map keyed by the file name. + *

+ * Individual URLs that cannot be read are skipped and added to the + * list of 'resourcesNotLoaded' + *

+ * Example classpath: + *

+ * META-INF/serializables/one + * META-INF/serializables/two # not readable + * META-INF/serializables/three + * META-INF/serializables/four/foo.txt + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * Map map = finder.mapAvailableStrings("serializables"); + * map.contains("one"); // true + * map.contains("two"); // false + * map.contains("three"); // true + * map.contains("four"); // false + * + * @param uri + * @return a list of the content of each resource URL found + * @throws IOException if classLoader.getResources throws an exception + */ + public Map mapAvailableStrings(String uri) throws IOException { + resourcesNotLoaded.clear(); + Map strings = new HashMap(); + Map resourcesMap = getResourcesMap(uri); + for (Iterator iterator = resourcesMap.entrySet().iterator(); iterator.hasNext();) { + Map.Entry entry = (Map.Entry) iterator.next(); + String name = (String) entry.getKey(); + URL url = (URL) entry.getValue(); + try { + String value = readContents(url); + strings.put(name, value); + } catch (IOException notAvailable) { + resourcesNotLoaded.add(url.toExternalForm()); + } + } + return strings; + } + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + // + // Find Class + // + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + + /** + * Executes {@link #findString(String)} assuming the contents URL found is the name of + * a class that should be loaded and returned. + * + * @param uri + * @return + * @throws IOException + * @throws ClassNotFoundException + */ + public Class findClass(String uri) throws IOException, ClassNotFoundException { + String className = findString(uri); + return classLoader.loadClass(className); + } + + /** + * Executes findAllStrings assuming the strings are + * the names of a classes that should be loaded and returned. + *

+ * Any URL or class that cannot be loaded will cause an exception to be thrown. + * + * @param uri + * @return + * @throws IOException + * @throws ClassNotFoundException + */ + public List> findAllClasses(String uri) throws IOException, ClassNotFoundException { + List> classes = new ArrayList>(); + List strings = findAllStrings(uri); + for (String className : strings) { + Class clazz = classLoader.loadClass(className); + classes.add(clazz); + } + return classes; + } + + /** + * Executes findAvailableStrings assuming the strings are + * the names of a classes that should be loaded and returned. + *

+ * Any class that cannot be loaded will be skipped and placed in the + * 'resourcesNotLoaded' collection. + * + * @param uri + * @return + * @throws IOException if classLoader.getResources throws an exception + */ + public List> findAvailableClasses(String uri) throws IOException { + resourcesNotLoaded.clear(); + List> classes = new ArrayList>(); + List strings = findAvailableStrings(uri); + for (String className : strings) { + try { + Class clazz = classLoader.loadClass(className); + classes.add(clazz); + } catch (Exception notAvailable) { + resourcesNotLoaded.add(className); + } + } + return classes; + } + + /** + * Executes mapAllStrings assuming the value of each entry in the + * map is the name of a class that should be loaded. + *

+ * Any class that cannot be loaded will be cause an exception to be thrown. + *

+ * Example classpath: + *

+ * META-INF/xmlparsers/xerces + * META-INF/xmlparsers/crimson + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * Map map = finder.mapAvailableStrings("xmlparsers"); + * map.contains("xerces"); // true + * map.contains("crimson"); // true + * Class xercesClass = map.get("xerces"); + * Class crimsonClass = map.get("crimson"); + * + * @param uri + * @return + * @throws IOException + * @throws ClassNotFoundException + */ + public Map> mapAllClasses(String uri) throws IOException, ClassNotFoundException { + Map> classes = new HashMap>(); + Map map = mapAllStrings(uri); + for (Map.Entry entry : map.entrySet()) { + String string = entry.getKey(); + String className = entry.getValue(); + Class clazz = classLoader.loadClass(className); + classes.put(string, clazz); + } + return classes; + } + + /** + * Executes mapAvailableStrings assuming the value of each entry in the + * map is the name of a class that should be loaded. + *

+ * Any class that cannot be loaded will be skipped and placed in the + * 'resourcesNotLoaded' collection. + *

+ * Example classpath: + *

+ * META-INF/xmlparsers/xerces + * META-INF/xmlparsers/crimson + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * Map map = finder.mapAvailableStrings("xmlparsers"); + * map.contains("xerces"); // true + * map.contains("crimson"); // true + * Class xercesClass = map.get("xerces"); + * Class crimsonClass = map.get("crimson"); + * + * @param uri + * @return + * @throws IOException if classLoader.getResources throws an exception + */ + public Map> mapAvailableClasses(String uri) throws IOException { + resourcesNotLoaded.clear(); + Map> classes = new HashMap>(); + Map map = mapAvailableStrings(uri); + for (Map.Entry entry : map.entrySet()) { + String string = entry.getKey(); + String className = entry.getValue(); + try { + Class clazz = classLoader.loadClass(className); + classes.put(string, clazz); + } catch (Exception notAvailable) { + resourcesNotLoaded.add(className); + } + } + return classes; + } + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + // + // Find Implementation + // + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + + /** + * Assumes the class specified points to a file in the classpath that contains + * the name of a class that implements or is a subclass of the specfied class. + *

+ * Any class that cannot be loaded will be cause an exception to be thrown. + *

+ * Example classpath: + *

+ * META-INF/java.io.InputStream # contains the classname org.acme.AcmeInputStream + * META-INF/java.io.OutputStream + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * Class clazz = finder.findImplementation(java.io.InputStream.class); + * clazz.getName(); // returns "org.acme.AcmeInputStream" + * + * @param interfase a superclass or interface + * @return + * @throws IOException if the URL cannot be read + * @throws ClassNotFoundException if the class found is not loadable + * @throws ClassCastException if the class found is not assignable to the specified superclass or interface + */ + public Class findImplementation(Class interfase) throws IOException, ClassNotFoundException { + String className = findString(interfase.getName()); + Class impl = classLoader.loadClass(className); + if (!interfase.isAssignableFrom(impl)) { + throw new ClassCastException("Class not of type: " + interfase.getName()); + } + return impl; + } + + /** + * Assumes the class specified points to a file in the classpath that contains + * the name of a class that implements or is a subclass of the specfied class. + *

+ * Any class that cannot be loaded or assigned to the specified interface will be cause + * an exception to be thrown. + *

+ * Example classpath: + *

+ * META-INF/java.io.InputStream # contains the classname org.acme.AcmeInputStream + * META-INF/java.io.InputStream # contains the classname org.widget.NeatoInputStream + * META-INF/java.io.InputStream # contains the classname com.foo.BarInputStream + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * List classes = finder.findAllImplementations(java.io.InputStream.class); + * classes.contains("org.acme.AcmeInputStream"); // true + * classes.contains("org.widget.NeatoInputStream"); // true + * classes.contains("com.foo.BarInputStream"); // true + * + * @param interfase a superclass or interface + * @return + * @throws IOException if the URL cannot be read + * @throws ClassNotFoundException if the class found is not loadable + * @throws ClassCastException if the class found is not assignable to the specified superclass or interface + */ + public List> findAllImplementations(Class interfase) throws IOException, ClassNotFoundException { + List> implementations = new ArrayList>(); + List strings = findAllStrings(interfase.getName()); + for (String className : strings) { + Class impl = classLoader.loadClass(className).asSubclass(interfase); + implementations.add(impl); + } + return implementations; + } + + /** + * Assumes the class specified points to a file in the classpath that contains + * the name of a class that implements or is a subclass of the specfied class. + *

+ * Any class that cannot be loaded or are not assignable to the specified class will be + * skipped and placed in the 'resourcesNotLoaded' collection. + *

+ * Example classpath: + *

+ * META-INF/java.io.InputStream # contains the classname org.acme.AcmeInputStream + * META-INF/java.io.InputStream # contains the classname org.widget.NeatoInputStream + * META-INF/java.io.InputStream # contains the classname com.foo.BarInputStream + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * List classes = finder.findAllImplementations(java.io.InputStream.class); + * classes.contains("org.acme.AcmeInputStream"); // true + * classes.contains("org.widget.NeatoInputStream"); // true + * classes.contains("com.foo.BarInputStream"); // true + * + * @param interfase a superclass or interface + * @return + * @throws IOException if classLoader.getResources throws an exception + */ + public List> findAvailableImplementations(Class interfase) throws IOException { + resourcesNotLoaded.clear(); + List> implementations = new ArrayList>(); + List strings = findAvailableStrings(interfase.getName()); + for (String className : strings) { + try { + Class impl = classLoader.loadClass(className); + if (interfase.isAssignableFrom(impl)) { + implementations.add(impl.asSubclass(interfase)); + } else { + resourcesNotLoaded.add(className); + } + } catch (Exception notAvailable) { + resourcesNotLoaded.add(className); + } + } + return implementations; + } + + /** + * Assumes the class specified points to a directory in the classpath that holds files + * containing the name of a class that implements or is a subclass of the specfied class. + *

+ * Any class that cannot be loaded or assigned to the specified interface will be cause + * an exception to be thrown. + *

+ * Example classpath: + *

+ * META-INF/java.net.URLStreamHandler/jar + * META-INF/java.net.URLStreamHandler/file + * META-INF/java.net.URLStreamHandler/http + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * Map map = finder.mapAllImplementations(java.net.URLStreamHandler.class); + * Class jarUrlHandler = map.get("jar"); + * Class fileUrlHandler = map.get("file"); + * Class httpUrlHandler = map.get("http"); + * + * @param interfase a superclass or interface + * @return + * @throws IOException if the URL cannot be read + * @throws ClassNotFoundException if the class found is not loadable + * @throws ClassCastException if the class found is not assignable to the specified superclass or interface + */ + public Map> mapAllImplementations(Class interfase) throws IOException, ClassNotFoundException { + Map> implementations = new HashMap>(); + Map map = mapAllStrings(interfase.getName()); + for (Map.Entry entry : map.entrySet()) { + String string = entry.getKey(); + String className = entry.getValue(); + Class impl = classLoader.loadClass(className).asSubclass(interfase); + implementations.put(string, impl); + } + return implementations; + } + + /** + * Assumes the class specified points to a directory in the classpath that holds files + * containing the name of a class that implements or is a subclass of the specfied class. + *

+ * Any class that cannot be loaded or are not assignable to the specified class will be + * skipped and placed in the 'resourcesNotLoaded' collection. + *

+ * Example classpath: + *

+ * META-INF/java.net.URLStreamHandler/jar + * META-INF/java.net.URLStreamHandler/file + * META-INF/java.net.URLStreamHandler/http + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * Map map = finder.mapAllImplementations(java.net.URLStreamHandler.class); + * Class jarUrlHandler = map.get("jar"); + * Class fileUrlHandler = map.get("file"); + * Class httpUrlHandler = map.get("http"); + * + * @param interfase a superclass or interface + * @return + * @throws IOException if classLoader.getResources throws an exception + */ + public Map> mapAvailableImplementations(Class interfase) throws IOException { + resourcesNotLoaded.clear(); + Map> implementations = new HashMap>(); + Map map = mapAvailableStrings(interfase.getName()); + for (Map.Entry entry : map.entrySet()) { + String string = entry.getKey(); + String className = entry.getValue(); + try { + Class impl = classLoader.loadClass(className); + if (interfase.isAssignableFrom(impl)) { + implementations.put(string, impl.asSubclass(interfase)); + } else { + resourcesNotLoaded.add(className); + } + } catch (Exception notAvailable) { + resourcesNotLoaded.add(className); + } + } + return implementations; + } + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + // + // Find Properties + // + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + + /** + * Finds the corresponding resource and reads it in as a properties file + *

+ * Example classpath: + *

+ * META-INF/widget.properties + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * Properties widgetProps = finder.findProperties("widget.properties"); + * + * @param uri + * @return + * @throws IOException if the URL cannot be read or is not in properties file format + */ + public Properties findProperties(String uri) throws IOException { + String fulluri = path + uri; + + URL resource = getResource(fulluri); + if (resource == null) { + throw new IOException("Could not find resource: " + fulluri); + } + + return loadProperties(resource); + } + + /** + * Finds the corresponding resources and reads them in as a properties files + *

+ * Any URL that cannot be read in as a properties file will cause an exception to be thrown. + *

+ * Example classpath: + *

+ * META-INF/app.properties + * META-INF/app.properties + * META-INF/app.properties + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * List appProps = finder.findAllProperties("app.properties"); + * + * @param uri + * @return + * @throws IOException if the URL cannot be read or is not in properties file format + */ + public List findAllProperties(String uri) throws IOException { + String fulluri = path + uri; + + List properties = new ArrayList(); + + Enumeration resources = getResources(fulluri); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + Properties props = loadProperties(url); + properties.add(props); + } + return properties; + } + + /** + * Finds the corresponding resources and reads them in as a properties files + *

+ * Any URL that cannot be read in as a properties file will be added to the + * 'resourcesNotLoaded' collection. + *

+ * Example classpath: + *

+ * META-INF/app.properties + * META-INF/app.properties + * META-INF/app.properties + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * List appProps = finder.findAvailableProperties("app.properties"); + * + * @param uri + * @return + * @throws IOException if classLoader.getResources throws an exception + */ + public List findAvailableProperties(String uri) throws IOException { + resourcesNotLoaded.clear(); + String fulluri = path + uri; + + List properties = new ArrayList(); + + Enumeration resources = getResources(fulluri); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + try { + Properties props = loadProperties(url); + properties.add(props); + } catch (Exception notAvailable) { + resourcesNotLoaded.add(url.toExternalForm()); + } + } + return properties; + } + + /** + * Finds the corresponding resources and reads them in as a properties files + *

+ * Any URL that cannot be read in as a properties file will cause an exception to be thrown. + *

+ * Example classpath: + *

+ - META-INF/jdbcDrivers/oracle.properties + - META-INF/jdbcDrivers/mysql.props + - META-INF/jdbcDrivers/derby + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * List driversList = finder.findAvailableProperties("jdbcDrivers"); + * Properties oracleProps = driversList.get("oracle.properties"); + * Properties mysqlProps = driversList.get("mysql.props"); + * Properties derbyProps = driversList.get("derby"); + * + * @param uri + * @return + * @throws IOException if the URL cannot be read or is not in properties file format + */ + public Map mapAllProperties(String uri) throws IOException { + Map propertiesMap = new HashMap(); + Map map = getResourcesMap(uri); + for (Iterator iterator = map.entrySet().iterator(); iterator.hasNext();) { + Map.Entry entry = (Map.Entry) iterator.next(); + String string = (String) entry.getKey(); + URL url = (URL) entry.getValue(); + Properties properties = loadProperties(url); + propertiesMap.put(string, properties); + } + return propertiesMap; + } + + /** + * Finds the corresponding resources and reads them in as a properties files + *

+ * Any URL that cannot be read in as a properties file will be added to the + * 'resourcesNotLoaded' collection. + *

+ * Example classpath: + *

+ * META-INF/jdbcDrivers/oracle.properties + * META-INF/jdbcDrivers/mysql.props + * META-INF/jdbcDrivers/derby + *

+ * ResourceFinder finder = new ResourceFinder("META-INF/"); + * List driversList = finder.findAvailableProperties("jdbcDrivers"); + * Properties oracleProps = driversList.get("oracle.properties"); + * Properties mysqlProps = driversList.get("mysql.props"); + * Properties derbyProps = driversList.get("derby"); + * + * @param uri + * @return + * @throws IOException if classLoader.getResources throws an exception + */ + public Map mapAvailableProperties(String uri) throws IOException { + resourcesNotLoaded.clear(); + Map propertiesMap = new HashMap(); + Map map = getResourcesMap(uri); + for (Iterator iterator = map.entrySet().iterator(); iterator.hasNext();) { + Map.Entry entry = (Map.Entry) iterator.next(); + String string = (String) entry.getKey(); + URL url = (URL) entry.getValue(); + try { + Properties properties = loadProperties(url); + propertiesMap.put(string, properties); + } catch (Exception notAvailable) { + resourcesNotLoaded.add(url.toExternalForm()); + } + } + return propertiesMap; + } + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + // + // Map Resources + // + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + + public Map getResourcesMap(String uri) throws IOException { + String basePath = path + uri; + + Map resources = new HashMap(); + if (!basePath.endsWith("/")) { + basePath += "/"; + } + Enumeration urls = getResources(basePath); + + while (urls.hasMoreElements()) { + URL location = urls.nextElement(); + + try { + if (location.getProtocol().equals("jar")) { + + readJarEntries(location, basePath, resources); + + } else if (location.getProtocol().equals("file")) { + + readDirectoryEntries(location, resources); + + } + } catch (Exception e) { + } + } + + return resources; + } + + private static void readDirectoryEntries(URL location, Map resources) throws MalformedURLException { + File dir = new File(decode(location.getPath())); + if (dir.isDirectory()) { + File[] files = dir.listFiles(); + for (File file : files) { + if (!file.isDirectory()) { + String name = file.getName(); + URL url = file.toURI().toURL(); + resources.put(name, url); + } + } + } + } + + private static void readJarEntries(URL location, String basePath, Map resources) throws IOException { + JarURLConnection conn = (JarURLConnection) location.openConnection(); + JarFile jarfile = null; + jarfile = conn.getJarFile(); + + Enumeration entries = jarfile.entries(); + while (entries != null && entries.hasMoreElements()) { + JarEntry entry = entries.nextElement(); + String name = entry.getName(); + + if (entry.isDirectory() || !name.startsWith(basePath) || name.length() == basePath.length()) { + continue; + } + + name = name.substring(basePath.length()); + + if (name.contains("/")) { + continue; + } + + URL resource = new URL(location, name); + resources.put(name, resource); + } + } + + private Properties loadProperties(URL resource) throws IOException { + InputStream in = resource.openStream(); + + BufferedInputStream reader = null; + try { + reader = new BufferedInputStream(in); + Properties properties = new Properties(); + properties.load(reader); + + return properties; + } finally { + try { + in.close(); + reader.close(); + } catch (Exception e) { + } + } + } + + private String readContents(URL resource) throws IOException { + InputStream in = resource.openStream(); + BufferedInputStream reader = null; + StringBuffer sb = new StringBuffer(); + + try { + reader = new BufferedInputStream(in); + + int b = reader.read(); + while (b != -1) { + sb.append((char) b); + b = reader.read(); + } + + return sb.toString().trim(); + } finally { + try { + in.close(); + reader.close(); + } catch (Exception e) { + } + } + } + + public URL getResource(String fullUri) { + if (urls == null){ + return classLoader.getResource(fullUri); + } + return findResource(fullUri, urls); + } + + private Enumeration getResources(String fulluri) throws IOException { + if (urls == null) { + return classLoader.getResources(fulluri); + } + Vector resources = new Vector(); + for (URL url : urls) { + URL resource = findResource(fulluri, url); + if (resource != null){ + resources.add(resource); + } + } + return resources.elements(); + } + + private URL findResource(String resourceName, URL... search) { + for (int i = 0; i < search.length; i++) { + URL currentUrl = search[i]; + if (currentUrl == null) { + continue; + } + + try { + String protocol = currentUrl.getProtocol(); + if (protocol.equals("jar")) { + /* + * If the connection for currentUrl or resURL is + * used, getJarFile() will throw an exception if the + * entry doesn't exist. + */ + URL jarURL = ((JarURLConnection) currentUrl.openConnection()).getJarFileURL(); + JarFile jarFile; + JarURLConnection juc; + try { + juc = (JarURLConnection) new URL("jar", "", jarURL.toExternalForm() + "!/").openConnection(); + jarFile = juc.getJarFile(); + } catch (IOException e) { + // Don't look for this jar file again + search[i] = null; + throw e; + } + + try { + juc = (JarURLConnection) new URL("jar", "", jarURL.toExternalForm() + "!/").openConnection(); + jarFile = juc.getJarFile(); + String entryName; + if (currentUrl.getFile().endsWith("!/")) { + entryName = resourceName; + } else { + String file = currentUrl.getFile(); + int sepIdx = file.lastIndexOf("!/"); + if (sepIdx == -1) { + // Invalid URL, don't look here again + search[i] = null; + continue; + } + sepIdx += 2; + StringBuffer sb = new StringBuffer(file.length() - sepIdx + resourceName.length()); + sb.append(file.substring(sepIdx)); + sb.append(resourceName); + entryName = sb.toString(); + } + if (entryName.equals("META-INF/") && jarFile.getEntry("META-INF/MANIFEST.MF") != null) { + return targetURL(currentUrl, "META-INF/MANIFEST.MF"); + } + if (jarFile.getEntry(entryName) != null) { + return targetURL(currentUrl, resourceName); + } + } finally { + if (!juc.getUseCaches()) { + try { + jarFile.close(); + } catch (Exception e) { + } + } + } + + } else if (protocol.equals("file")) { + String baseFile = currentUrl.getFile(); + String host = currentUrl.getHost(); + int hostLength = 0; + if (host != null) { + hostLength = host.length(); + } + StringBuffer buf = new StringBuffer(2 + hostLength + baseFile.length() + resourceName.length()); + + if (hostLength > 0) { + buf.append("//").append(host); + } + // baseFile always ends with '/' + buf.append(baseFile); + String fixedResName = resourceName; + // Do not create a UNC path, i.e. \\host + while (fixedResName.startsWith("/") || fixedResName.startsWith("\\")) { + fixedResName = fixedResName.substring(1); + } + buf.append(fixedResName); + String filename = buf.toString(); + File file = new File(filename); + File file2 = new File(decode(filename)); + + if (file.exists() || file2.exists()) { + return targetURL(currentUrl, fixedResName); + } + } else { + URL resourceURL = targetURL(currentUrl, resourceName); + URLConnection urlConnection = resourceURL.openConnection(); + + try { + urlConnection.getInputStream().close(); + } catch (SecurityException e) { + return null; + } + // HTTP can return a stream on a non-existent file + // So check for the return code; + if (!resourceURL.getProtocol().equals("http")) { + return resourceURL; + } + + int code = ((HttpURLConnection) urlConnection).getResponseCode(); + if (code >= 200 && code < 300) { + return resourceURL; + } + } + } catch (MalformedURLException e) { + // Keep iterating through the URL list + } catch (IOException e) { + } catch (SecurityException e) { + } + } + return null; + } + + private URL targetURL(URL base, String name) throws MalformedURLException { + StringBuffer sb = new StringBuffer(base.getFile().length() + name.length()); + sb.append(base.getFile()); + sb.append(name); + String file = sb.toString(); + return new URL(base.getProtocol(), base.getHost(), base.getPort(), file, null); + } + + public static String decode(String fileName) { + if (fileName.indexOf('%') == -1) return fileName; + + StringBuilder result = new StringBuilder(fileName.length()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + for (int i = 0; i < fileName.length();) { + char c = fileName.charAt(i); + + if (c == '%') { + out.reset(); + do { + if (i + 2 >= fileName.length()) { + throw new IllegalArgumentException("Incomplete % sequence at: " + i); + } + + int d1 = Character.digit(fileName.charAt(i + 1), 16); + int d2 = Character.digit(fileName.charAt(i + 2), 16); + + if (d1 == -1 || d2 == -1) { + throw new IllegalArgumentException("Invalid % sequence (" + fileName.substring(i, i + 3) + ") at: " + String.valueOf(i)); + } + + out.write((byte) ((d1 << 4) + d2)); + + i += 3; + + } while (i < fileName.length() && fileName.charAt(i) == '%'); + + + result.append(out.toString()); + + continue; + } else { + result.append(c); + } + + i++; + } + return result.toString(); + } + +} diff --git hbase-hadoop1-compat/pom.xml hbase-hadoop1-compat/pom.xml new file mode 100644 index 0000000..fcddcf2 --- /dev/null +++ hbase-hadoop1-compat/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + + hbase + org.apache.hbase + 0.95-SNAPSHOT + .. + + + hbase-hadoop1-compat + HBase - Hadoop One Compatibility + + Interfaces to be implemented in order to smooth + over hadoop version differences + + + + + + maven-surefire-plugin + + + + secondPartTestsExecution + test + + test + + + true + + + + + + + + + + + org.apache.hbase + hbase-hadoop-compat + + + org.apache.hadoop + hadoop-core + ${hadoop-one.version} + true + + + hsqldb + hsqldb + + + net.sf.kosmosfs + kfs + + + org.eclipse.jdt + core + + + net.java.dev.jets3t + jets3t + + + oro + oro + + + + + org.apache.hadoop + hadoop-test + ${hadoop-one.version} + true + test + + + + diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java new file mode 100644 index 0000000..6f3111d --- /dev/null +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver.metrics2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.MetricsBuilder; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong; +import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Hadoop1 implementation of ReplicationMetricsSource. + * This provides access to metrics gauges and counters. + */ +public class ReplicationMetricsSourceImpl implements ReplicationMetricsSource, MetricsSource { + + public static final Log LOG = LogFactory.getLog(ReplicationMetricsSourceImpl.class); + public static final String METRICS_NAME = "ReplicationMetrics"; + private static final String METRICS_CONTEXT = "replicationmetrics"; + + private ConcurrentMap + gauges = new ConcurrentHashMap(); + private ConcurrentMap counters = + new ConcurrentHashMap(); + + ReplicationMetricsSourceImpl() { + DefaultMetricsSystem.initialize("hbase"); + DefaultMetricsSystem.registerSource(METRICS_NAME, "Metrics about hbase replication", this); + } + + /** + * Set a single gauge to a value. + * + * @param gaugeName gauge name + * @param value the new value of the gauge. + */ + public void setGauge(String gaugeName, long value) { + MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, value); + gaugeInt.set(value); + } + + /** + * Add some amount to a gauge. + * + * @param gaugeName The name of the gauge to increment. + * @param delta The amount to increment the gauge by. + */ + public void incGauge(String gaugeName, long delta) { + MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l); + gaugeInt.incr(delta); + } + + /** + * Decrease the value of a named gauge. + * + * @param gaugeName The name of the gauge. + * @param delta the ammount to subtract from a gauge value. + */ + public void decGauge(String gaugeName, long delta) { + MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l); + gaugeInt.decr(delta); + } + + /** + * Increment a named counter by some value. + * + * @param key the name of the counter + * @param delta the ammount to increment + */ + public void incCounters(String key, long delta) { + MetricMutableCounterLong counter = getLongCounter(key, 0l); + counter.incr(delta); + + } + + /** + * Remove a named gauge. + * + * @param key + */ + public void removeGauge(String key) { + gauges.remove(key); + } + + /** + * Remove a named counter. + * + * @param key + */ + public void removeCounter(String key) { + counters.remove(key); + } + + /** + * Method to export all the metrics. + * + * @param metricsBuilder Builder to accept metrics + * @param all push all or only changed? + */ + @Override + public void getMetrics(MetricsBuilder metricsBuilder, boolean all) { + + MetricsRecordBuilder rb = metricsBuilder.addRecord(METRICS_NAME).setContext(METRICS_CONTEXT); + + for (Map.Entry entry : counters.entrySet()) { + entry.getValue().snapshot(rb, all); + } + for (Map.Entry entry : gauges.entrySet()) { + entry.getValue().snapshot(rb, all); + } + + } + + /** + * Get a MetricMutableGaugeLong from the storage. If it is not there atomically put it. + * + * @param gaugeName name of the gauge to create or get. + * @param potentialStartingValue value of the new counter if we have to create it. + * @return + */ + private MetricMutableGaugeLong getLongGauge(String gaugeName, long potentialStartingValue) { + //Try and get the guage. + MetricMutableGaugeLong gaugeInt = gauges.get(gaugeName); + + //If it's not there then try and put a new one in the storage. + if (gaugeInt == null) { + + //Create the potential new gauge. + MetricMutableGaugeLong newGauge = new MetricMutableGaugeLong(gaugeName, "", + potentialStartingValue); + + // Try and put the gauge in. This is atomic. + gaugeInt = gauges.putIfAbsent(gaugeName, newGauge); + + //If the value we get back is null then the put was successful and we will return that. + //otherwise gaugeInt should contain the thing that was in before the put could be completed. + if (gaugeInt == null) { + gaugeInt = newGauge; + } + } + return gaugeInt; + } + + /** + * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it. + * + * @param counterName Name of the counter to get + * @param potentialStartingValue starting value if we have to create a new counter + * @return + */ + private MetricMutableCounterLong getLongCounter(String counterName, long potentialStartingValue) { + //See getLongGauge for description on how this works. + MetricMutableCounterLong counter = counters.get(counterName); + if (counter == null) { + MetricMutableCounterLong newCounter = + new MetricMutableCounterLong(counterName, "", potentialStartingValue); + counter = counters.putIfAbsent(counterName, newCounter); + if (counter == null) { + counter = newCounter; + } + } + return counter; + } +} diff --git hbase-hadoop1-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource hbase-hadoop1-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource new file mode 100644 index 0000000..b49b27a --- /dev/null +++ hbase-hadoop1-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource @@ -0,0 +1 @@ +org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSourceImpl \ No newline at end of file diff --git hbase-hadoop2-compat/pom.xml hbase-hadoop2-compat/pom.xml new file mode 100644 index 0000000..bed591c --- /dev/null +++ hbase-hadoop2-compat/pom.xml @@ -0,0 +1,105 @@ + + + 4.0.0 + + hbase + org.apache.hbase + 0.95-SNAPSHOT + .. + + + hbase-hadoop2-compat + HBase - Hadoop Two Compatibility + + Interfaces to be implemented in order to smooth + over hadoop version differences + + + + + + maven-surefire-plugin + + + + secondPartTestsExecution + test + + test + + + true + + + + + + maven-dependency-plugin + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + + ${project.build.directory}/test-classes/mrapp-generated-classpath + + + + + + + + + + + + org.apache.hbase + hbase-hadoop-compat + + + org.apache.hadoop + hadoop-client + ${hadoop-two.version} + + + org.apache.hadoop + hadoop-annotations + ${hadoop-two.version} + + + + org.apache.hadoop + hadoop-minicluster + ${hadoop-two.version} + + + diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java new file mode 100644 index 0000000..6dce8d4 --- /dev/null +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationMetricsSourceImpl.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver.metrics2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.*; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Hadoop2 implementation of ReplicationMetricsSource. + * This provides access to metrics gauges and counters. + */ +public class ReplicationMetricsSourceImpl implements ReplicationMetricsSource, MetricsSource { + + public static final Log LOG = LogFactory.getLog(ReplicationMetricsSourceImpl.class); + public static final String METRICS_NAME = "ReplicationMetrics"; + private static final String METRICS_CONTEXT = "replicationmetrics"; + + private ConcurrentMap + gauges = new ConcurrentHashMap(); + private ConcurrentMap counters = + new ConcurrentHashMap(); + + private MetricsRegistry mr; + + + ReplicationMetricsSourceImpl() { + DefaultMetricsSystem.initialize("hbase"); + DefaultMetricsSystem.instance().register(METRICS_NAME, "Metrics about HBase replication", this); + mr = new MetricsRegistry(METRICS_NAME); + } + + /** + * Set a single gauge to a value. + * + * @param gaugeName gauge name + * @param value the new value of the gauge. + */ + public void setGauge(String gaugeName, long value) { + MutableGaugeLong gaugeInt = getLongGauge(gaugeName, value); + gaugeInt.set(value); + } + + /** + * Add some amount to a gauge. + * + * @param gaugeName The name of the gauge to increment. + * @param delta The amount to increment the gauge by. + */ + public void incGauge(String gaugeName, long delta) { + MutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l); + gaugeInt.incr(delta); + } + + /** + * Decrease the value of a named gauge. + * + * @param gaugeName The name of the gauge. + * @param delta the ammount to subtract from a gauge value. + */ + public void decGauge(String gaugeName, long delta) { + MutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l); + gaugeInt.decr(delta); + } + + /** + * Increment a named counter by some value. + * + * @param key the name of the counter + * @param delta the ammount to increment + */ + public void incCounters(String key, long delta) { + MutableCounterLong counter = getLongCounter(key, 0l); + counter.incr(delta); + + } + + /** + * Remove a named gauge. + * + * @param key + */ + public void removeGauge(String key) { + gauges.remove(key); + } + + /** + * Remove a named counter. + * + * @param key + */ + public void removeCounter(String key) { + counters.remove(key); + } + + + + @Override + public void getMetrics(MetricsCollector metricsCollector, boolean all) { + MetricsRecordBuilder rb = metricsCollector.addRecord(METRICS_NAME).setContext(METRICS_CONTEXT); + + for (Map.Entry entry : counters.entrySet()) { + entry.getValue().snapshot(rb, all); + } + for (Map.Entry entry : gauges.entrySet()) { + entry.getValue().snapshot(rb, all); + } + + } + + + /** + * Get a MetricMutableGaugeLong from the storage. If it is not there atomically put it. + * + * @param gaugeName name of the gauge to create or get. + * @param potentialStartingValue value of the new counter if we have to create it. + * @return + */ + private MutableGaugeLong getLongGauge(String gaugeName, long potentialStartingValue) { + //Try and get the guage. + MutableGaugeLong gaugeInt = gauges.get(gaugeName); + + //If it's not there then try and put a new one in the storage. + if (gaugeInt == null) { + + //Create the potential new gauge. + MutableGaugeLong newGauge = mr.newGauge(gaugeName, "", potentialStartingValue); + + // Try and put the gauge in. This is atomic. + gaugeInt = gauges.putIfAbsent(gaugeName, newGauge); + + //If the value we get back is null then the put was successful and we will return that. + //otherwise gaugeInt should contain the thing that was in before the put could be completed. + if (gaugeInt == null) { + gaugeInt = newGauge; + } + } + return gaugeInt; + } + + /** + * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it. + * + * @param counterName Name of the counter to get + * @param potentialStartingValue starting value if we have to create a new counter + * @return + */ + private MutableCounterLong getLongCounter(String counterName, long potentialStartingValue) { + //See getLongGauge for description on how this works. + MutableCounterLong counter = counters.get(counterName); + if (counter == null) { + MutableCounterLong newCounter = mr.newCounter(counterName, "", potentialStartingValue); + counter = counters.putIfAbsent(counterName, newCounter); + if (counter == null) { + counter = newCounter; + } + } + return counter; + } +} diff --git hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource new file mode 100644 index 0000000..b49b27a --- /dev/null +++ hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSource @@ -0,0 +1 @@ +org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationMetricsSourceImpl \ No newline at end of file diff --git hbase-server/pom.xml hbase-server/pom.xml index 4359d9c..850aa03 100644 --- hbase-server/pom.xml +++ hbase-server/pom.xml @@ -288,6 +288,15 @@ org.apache.hbase hbase-common + + org.apache.hbase + hbase-hadoop-compat + + + org.apache.hbase + ${compat.module} + ${project.version} + io.netty diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 0b6987e..3016e07 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -19,13 +19,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.Map.Entry; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -39,8 +32,16 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationSinkMetrics; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + /** * This class is responsible for replicating the edits coming * from another cluster. @@ -133,7 +134,7 @@ public class ReplicationSink { } this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); - this.metrics.appliedBatchesRate.inc(1); + this.metrics.applyBatch(entries.length); LOG.info("Total replicated: " + totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); @@ -173,7 +174,6 @@ public class ReplicationSink { try { table = this.pool.getTable(tableName); table.batch(rows); - this.metrics.appliedOpsRate.inc(rows.size()); } catch (InterruptedException ix) { throw new IOException(ix); } finally { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java deleted file mode 100644 index bf324e2..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsRate; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; -import org.apache.hadoop.metrics.util.MetricsIntValue; -import org.apache.hadoop.metrics.util.MetricsLongValue; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -/** - * This class is for maintaining the various replication statistics - * for a sink and publishing them through the metrics interfaces. - */ -@InterfaceAudience.Private -public class ReplicationSinkMetrics implements Updater { - private final MetricsRecord metricsRecord; - private MetricsRegistry registry = new MetricsRegistry(); - - /** Rate of operations applied by the sink */ - public final MetricsRate appliedOpsRate = - new MetricsRate("appliedOpsRate", registry); - - /** Rate of batches (of operations) applied by the sink */ - public final MetricsRate appliedBatchesRate = - new MetricsRate("appliedBatchesRate", registry); - - /** Age of the last operation that was applied by the sink */ - private final MetricsLongValue ageOfLastAppliedOp = - new MetricsLongValue("ageOfLastAppliedOp", registry); - - /** - * Constructor used to register the metrics - */ - public ReplicationSinkMetrics() { - MetricsContext context = MetricsUtil.getContext("hbase"); - String name = Thread.currentThread().getName(); - metricsRecord = MetricsUtil.createRecord(context, "replication"); - metricsRecord.setTag("RegionServer", name); - context.registerUpdater(this); - // export for JMX - new ReplicationStatistics(this.registry, "ReplicationSink"); - } - - /** - * Set the age of the last edit that was applied - * @param timestamp write time of the edit - */ - public void setAgeOfLastAppliedOp(long timestamp) { - ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp); - } - @Override - public void doUpdates(MetricsContext metricsContext) { - synchronized (this) { - this.appliedOpsRate.pushMetric(this.metricsRecord); - this.appliedBatchesRate.pushMetric(this.metricsRecord); - this.ageOfLastAppliedOp.pushMetric(this.metricsRecord); - } - this.metricsRecord.update(); - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2c328ea..28b997d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.regionserver.metrics2.ReplicationSourceMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -238,7 +239,7 @@ public class ReplicationSource extends Thread @Override public void enqueueLog(Path log) { this.queue.put(log); - this.metrics.sizeOfLogQueue.set(queue.size()); + this.metrics.setSizeOfLogQueue(queue.size()); } @Override @@ -246,6 +247,7 @@ public class ReplicationSource extends Thread connectToPeers(); // We were stopped while looping to connect to sinks, just abort if (!this.isActive()) { + metrics.clear(); return; } // delay this until we are in an asynchronous thread @@ -376,6 +378,7 @@ public class ReplicationSource extends Thread } } LOG.debug("Source exiting " + peerId); + metrics.clear(); } /** @@ -393,7 +396,7 @@ public class ReplicationSource extends Thread HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]); while (entry != null) { WALEdit edit = entry.getEdit(); - this.metrics.logEditsReadRate.inc(1); + this.metrics.incrLogEditsRead(); seenEntries++; // Remove all KVs that should not be replicated HLogKey logKey = entry.getKey(); @@ -415,7 +418,7 @@ public class ReplicationSource extends Thread currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; } else { - this.metrics.logEditsFilteredRate.inc(1); + this.metrics.incrLogEditsFiltered(); } } // Stop if too many entries or too big @@ -455,7 +458,7 @@ public class ReplicationSource extends Thread try { if (this.currentPath == null) { this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS); - this.metrics.sizeOfLogQueue.set(queue.size()); + this.metrics.setSizeOfLogQueue(queue.size()); } } catch (InterruptedException e) { LOG.warn("Interrupted while reading edits", e); @@ -616,9 +619,7 @@ public class ReplicationSource extends Thread this.lastLoggedPosition = this.position; } this.totalReplicatedEdits += currentNbEntries; - this.metrics.shippedBatchesRate.inc(1); - this.metrics.shippedOpsRate.inc( - this.currentNbOperations); + this.metrics.shipBatch(this.currentNbOperations); this.metrics.setAgeOfLastShippedOp( this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime()); LOG.debug("Replicated in total: " + this.totalReplicatedEdits); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java deleted file mode 100644 index 543e15d..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsRate; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; -import org.apache.hadoop.metrics.util.MetricsIntValue; -import org.apache.hadoop.metrics.util.MetricsLongValue; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -/** - * This class is for maintaining the various replication statistics - * for a source and publishing them through the metrics interfaces. - */ -@InterfaceAudience.Private -public class ReplicationSourceMetrics implements Updater { - private final MetricsRecord metricsRecord; - private MetricsRegistry registry = new MetricsRegistry(); - - /** Rate of shipped operations by the source */ - public final MetricsRate shippedOpsRate = - new MetricsRate("shippedOpsRate", registry); - - /** Rate of shipped batches by the source */ - public final MetricsRate shippedBatchesRate = - new MetricsRate("shippedBatchesRate", registry); - - /** Rate of log entries (can be multiple Puts) read from the logs */ - public final MetricsRate logEditsReadRate = - new MetricsRate("logEditsReadRate", registry); - - /** Rate of log entries filtered by the source */ - public final MetricsRate logEditsFilteredRate = - new MetricsRate("logEditsFilteredRate", registry); - - /** Age of the last operation that was shipped by the source */ - private final MetricsLongValue ageOfLastShippedOp = - new MetricsLongValue("ageOfLastShippedOp", registry); - - /** - * Current size of the queue of logs to replicate, - * excluding the one being processed at the moment - */ - public final MetricsIntValue sizeOfLogQueue = - new MetricsIntValue("sizeOfLogQueue", registry); - - // It's a little dirty to preset the age to now since if we fail - // to replicate the very first time then it will show that age instead - // of nothing (although that might not be good either). - private long lastTimestampForAge = System.currentTimeMillis(); - - /** - * Constructor used to register the metrics - * @param id Name of the source this class is monitoring - */ - public ReplicationSourceMetrics(String id) { - MetricsContext context = MetricsUtil.getContext("hbase"); - String name = Thread.currentThread().getName(); - metricsRecord = MetricsUtil.createRecord(context, "replication"); - metricsRecord.setTag("RegionServer", name); - context.registerUpdater(this); - try { - id = URLEncoder.encode(id, "UTF8"); - } catch (UnsupportedEncodingException e) { - id = "CAN'T ENCODE UTF8"; - } - // export for JMX - new ReplicationStatistics(this.registry, "ReplicationSource for " + id); - } - - /** - * Set the age of the last edit that was shipped - * @param timestamp write time of the edit - */ - public void setAgeOfLastShippedOp(long timestamp) { - lastTimestampForAge = timestamp; - ageOfLastShippedOp.set(System.currentTimeMillis() - lastTimestampForAge); - } - - /** - * Convenience method to use the last given timestamp to refresh the age - * of the last edit. Used when replication fails and need to keep that - * metric accurate. - */ - public void refreshAgeOfLastShippedOp() { - setAgeOfLastShippedOp(lastTimestampForAge); - } - - @Override - public void doUpdates(MetricsContext metricsContext) { - synchronized (this) { - this.shippedOpsRate.pushMetric(this.metricsRecord); - this.shippedBatchesRate.pushMetric(this.metricsRecord); - this.logEditsReadRate.pushMetric(this.metricsRecord); - this.logEditsFilteredRate.pushMetric(this.metricsRecord); - this.ageOfLastShippedOp.pushMetric(this.metricsRecord); - this.sizeOfLogQueue.pushMetric(this.metricsRecord); - } - this.metricsRecord.update(); - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java deleted file mode 100644 index ceeff5f..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsMBeanBase; -import org.apache.hadoop.metrics.util.MBeanUtil; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -import javax.management.ObjectName; - -/** - * Exports metrics recorded by {@link ReplicationSourceMetrics} as an MBean - * for JMX monitoring. - */ -@InterfaceAudience.Private -public class ReplicationStatistics extends MetricsMBeanBase { - - private final ObjectName mbeanName; - - /** - * Constructor to register the MBean - * @param registry which rehistry to use - * @param name name to get to this bean - */ - public ReplicationStatistics(MetricsRegistry registry, String name) { - super(registry, name); - mbeanName = MBeanUtil.registerMBean("Replication", name, this); - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSinkMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSinkMetrics.java new file mode 100644 index 0000000..8f9b9a5 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSinkMetrics.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver.metrics2; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.Iterator; +import java.util.ServiceLoader; + +/** + * This class is for maintaining the various replication statistics for a sink and publishing them + * through the metrics interfaces. + */ +@InterfaceAudience.Private +public class ReplicationSinkMetrics { + + public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; + public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches"; + public static final String SINK_APPLIED_OPS = "sink.appliedOps"; + + private ReplicationMetricsSource rms; + + public ReplicationSinkMetrics() { + rms = ReplicationMetricsSourceFactory.getInstance(); + } + + /** + * Set the age of the last applied operation + * + * @param timestamp The timestamp of the last operation applied. + */ + public void setAgeOfLastAppliedOp(long timestamp) { + long age = System.currentTimeMillis() - timestamp; + rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age); + } + + /** + * Convience method to change metrics when a batch of operations are applied. + * + * @param batchSize + */ + public void applyBatch(long batchSize) { + rms.incCounters(SINK_APPLIED_BATCHES, 1); + rms.incCounters(SINK_APPLIED_OPS, batchSize); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSourceMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSourceMetrics.java new file mode 100644 index 0000000..85d6057 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics2/ReplicationSourceMetrics.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver.metrics2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.ServiceLoader; + +/** + * This class is for maintaining the various replication statistics for a source and publishing them + * through the metrics interfaces. + */ +@InterfaceAudience.Private +public class ReplicationSourceMetrics { + + public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; + public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; + public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead"; + public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered"; + public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; + public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; + + public static final Log LOG = LogFactory.getLog(ReplicationSourceMetrics.class); + private String id; + + private long lastTimestamp = 0; + private int lastQueueSize = 0; + + private String sizeOfLogQueKey; + private String ageOfLastShippedOpKey; + private String logEditsReadKey; + private String logEditsFilteredKey; + private final String shippedBatchesKey; + private final String shippedOpsKey; + + private ReplicationMetricsSource rms; + + /** + * Constructor used to register the metrics + * + * @param id Name of the source this class is monitoring + */ + public ReplicationSourceMetrics(String id) { + this.id = id; + + sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue"; + ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp"; + logEditsReadKey = "source." + id + ".logEditsRead"; + logEditsFilteredKey = "source." + id + ".logEditsFiltered"; + shippedBatchesKey = "source." + this.id + ".shippedBatches"; + shippedOpsKey = "source." + this.id + ".shippedOps"; + rms = ReplicationMetricsSourceFactory.getInstance(); + } + + /** + * Set the age of the last edit that was shipped + * + * @param timestamp write time of the edit + */ + public void setAgeOfLastShippedOp(long timestamp) { + long age = System.currentTimeMillis() - timestamp; + rms.setGauge(ageOfLastShippedOpKey, age); + rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age); + this.lastTimestamp = timestamp; + } + + /** + * Convenience method to use the last given timestamp to refresh the age of the last edit. Used + * when replication fails and need to keep that metric accurate. + */ + public void refreshAgeOfLastShippedOp() { + if (this.lastTimestamp > 0) { + setAgeOfLastShippedOp(this.lastTimestamp); + } + } + + /** + * Set the size of the log queue + * + * @param size the size. + */ + public void setSizeOfLogQueue(int size) { + rms.setGauge(sizeOfLogQueKey, size); + rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize); + lastQueueSize = size; + } + + /** + * Add on the the number of log edits read + * + * @param delta the number of log edits read. + */ + private void incrLogEditsRead(long delta) { + rms.incCounters(logEditsReadKey, delta); + rms.incCounters(SOURCE_LOG_EDITS_READ, delta); + } + + /** Increment the number of log edits read by one. */ + public void incrLogEditsRead() { + incrLogEditsRead(1); + } + + /** + * Add on the number of log edits filtered + * + * @param delta the number filtered. + */ + private void incrLogEditsFiltered(long delta) { + rms.incCounters(logEditsFilteredKey, delta); + rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta); + } + + /** The number of log edits filtered out. */ + public void incrLogEditsFiltered() { + incrLogEditsFiltered(1); + } + + /** + * Convience method to apply changes to metrics do to shipping a batch of logs. + * + * @param batchSize the size of the batch that was shipped to sinks. + */ + public void shipBatch(long batchSize) { + rms.incCounters(shippedBatchesKey, 1); + rms.incCounters(SOURCE_SHIPPED_BATCHES, 1); + rms.incCounters(shippedOpsKey, batchSize); + rms.incCounters(SOURCE_SHIPPED_OPS, batchSize); + } + + /** Removes all metrics about this Source. */ + public void clear() { + rms.removeGauge(sizeOfLogQueKey); + rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize); + lastQueueSize = 0; + rms.removeGauge(ageOfLastShippedOpKey); + + rms.removeCounter(logEditsFilteredKey); + rms.removeCounter(logEditsReadKey); + + } +} diff --git pom.xml pom.xml index 2cd0efc..c95762d 100644 --- pom.xml +++ pom.xml @@ -41,6 +41,9 @@ http://hbase.apache.org hbase-server + hbase-hadoop2-compat + hbase-hadoop1-compat + hbase-hadoop-compat hbase-common hbase-it @@ -600,7 +603,7 @@ gnu false - src/assembly/all.xml + ${assembly.file} @@ -786,6 +789,8 @@ 1.6 ${project.version} + 2.0.0-alpha + 1.0.3 1.5.3 1.2 1.4 @@ -837,6 +842,7 @@ hbase-server-${project.version}-tests.jar 2.12-TRUNK-HBASE-2 surefire-junit47 + hbase-hadoop1-compat false false @@ -866,6 +872,16 @@ ${project.version} + org.apache.hbase + hbase-hadoop-compat + ${project.version} + + + org.apache.hbase + ${compat.module} + ${project.version} + + hbase-server org.apache.hbase ${project.version} @@ -1227,8 +1243,10 @@ - 1.0.3 + ${hadoop-one.version} 1.4.3 + hbase-hadoop1-compat + src/assembly/one.xml @@ -1282,26 +1300,28 @@ 1.6.1 - 2.0.0-alpha + ${hadoop-two.version} + hbase-hadoop2-compat + src/assembly/two.xml org.apache.hadoop hadoop-client - ${hadoop.version} + ${hadoop-two.version} org.apache.hadoop hadoop-annotations - ${hadoop.version} + ${hadoop-two.version} org.apache.hadoop hadoop-minicluster - ${hadoop.version} + ${hadoop-two.version} diff --git src/assembly/all.xml src/assembly/all.xml deleted file mode 100644 index 5a9866b..0000000 --- src/assembly/all.xml +++ /dev/null @@ -1,153 +0,0 @@ - - - - - - all - - tar.gz - - - - - target/site - docs - - - - / - - *.txt - pom.xml - - - - - conf - conf - 0644 - 0755 - - - - bin - bin - 0755 - 0755 - - - conf - conf - 0644 - 0755 - - - - src - src - 0644 - 0755 - - - - dev-support - dev-support - 0755 - 0755 - - - - hbase-server/src/main/ruby - lib/ruby - 0644 - 0755 - - - - hbase-server/target/hbase-webapps - hbase-webapps - 0644 - 0755 - - - - hbase-server/target/native - native - 0755 - 0755 - - *.so - - - - - ${parent.basedir}/hbase-server/target/ - lib - - ${server.test.jar} - - 0644 - - - - - - true - - - - - org.apache.hbase:hbase-* - - - - - - - target/ - test/ - .classpath - .project - .settings/ - - - - - - - lib - false - - - - - - - diff --git src/assembly/one.xml src/assembly/one.xml new file mode 100644 index 0000000..c3b6610 --- /dev/null +++ src/assembly/one.xml @@ -0,0 +1,156 @@ + + + + + + all + + tar.gz + + + + + target/site + docs + + + + / + + *.txt + pom.xml + + + + + conf + conf + 0644 + 0755 + + + + bin + bin + 0755 + 0755 + + + conf + conf + 0644 + 0755 + + + + src + src + 0644 + 0755 + + + + dev-support + dev-support + 0755 + 0755 + + + + hbase-server/src/main/ruby + lib/ruby + 0644 + 0755 + + + + hbase-server/target/hbase-webapps + hbase-webapps + 0644 + 0755 + + + + hbase-server/target/native + native + 0755 + 0755 + + *.so + + + + + ${parent.basedir}/hbase-server/target/ + lib + + ${server.test.jar} + + 0644 + + + + + + true + + + + + org.apache.hbase:hbase-common + org.apache.hbase:hbase-hadoop-compat + org.apache.hbase:hbase-hadoop1-compat + org.apache.hbase:hbase-server + + + + + + + target/ + test/ + .classpath + .project + .settings/ + + + + + + + lib + false + + + + + + + diff --git src/assembly/two.xml src/assembly/two.xml new file mode 100644 index 0000000..9f5a844 --- /dev/null +++ src/assembly/two.xml @@ -0,0 +1,156 @@ + + + + + + all + + tar.gz + + + + + target/site + docs + + + + / + + *.txt + pom.xml + + + + + conf + conf + 0644 + 0755 + + + + bin + bin + 0755 + 0755 + + + conf + conf + 0644 + 0755 + + + + src + src + 0644 + 0755 + + + + dev-support + dev-support + 0755 + 0755 + + + + hbase-server/src/main/ruby + lib/ruby + 0644 + 0755 + + + + hbase-server/target/hbase-webapps + hbase-webapps + 0644 + 0755 + + + + hbase-server/target/native + native + 0755 + 0755 + + *.so + + + + + ${parent.basedir}/hbase-server/target/ + lib + + ${server.test.jar} + + 0644 + + + + + + true + + + + + org.apache.hbase:hbase-common + org.apache.hbase:hbase-hadoop-compat + org.apache.hbase:hbase-hadoop2-compat + org.apache.hbase:hbase-server + + + + + + + target/ + test/ + .classpath + .project + .settings/ + + + + + + + lib + false + + + + + + +