Index: conf/hadoop-metrics.properties =================================================================== --- conf/hadoop-metrics.properties (revision 727057) +++ conf/hadoop-metrics.properties (working copy) @@ -7,7 +7,9 @@ #dfs.fileName=/tmp/dfsmetrics.log # Configuration of the "dfs" context for ganglia +# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter) # dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext +# dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext31 # dfs.period=10 # dfs.servers=localhost:8649 @@ -21,13 +23,15 @@ #mapred.fileName=/tmp/mrmetrics.log # Configuration of the "mapred" context for ganglia +# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter) # mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext +# mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext31 # mapred.period=10 # mapred.servers=localhost:8649 # Configuration of the "jvm" context for null -jvm.class=org.apache.hadoop.metrics.spi.NullContext +#jvm.class=org.apache.hadoop.metrics.spi.NullContext # Configuration of the "jvm" context for file #jvm.class=org.apache.hadoop.metrics.file.FileContext @@ -35,6 +39,8 @@ #jvm.fileName=/tmp/jvmmetrics.log # Configuration of the "jvm" context for ganglia +# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter) # jvm.class=org.apache.hadoop.metrics.ganglia.GangliaContext -# jvm.period=10 -# jvm.servers=localhost:8649 +jvm.class=org.apache.hadoop.metrics.ganglia.GangliaContext31 +jvm.period=10 +jvm.servers=localhost:8649 Index: src/core/org/apache/hadoop/metrics/ganglia/GangliaContext.java =================================================================== --- src/core/org/apache/hadoop/metrics/ganglia/GangliaContext.java (revision 759439) +++ src/core/org/apache/hadoop/metrics/ganglia/GangliaContext.java (working copy) @@ -71,16 +71,16 @@ typeTable.put(Float.class, "float"); } - private byte[] buffer = new byte[BUFFER_SIZE]; - private int offset; + protected byte[] buffer = new byte[BUFFER_SIZE]; + protected int offset; - private List metricsServers; + protected List metricsServers; private Map unitsTable; private Map slopeTable; private Map tmaxTable; private Map dmaxTable; - private DatagramSocket datagramSocket; + protected DatagramSocket datagramSocket; /** Creates a new instance of GangliaContext */ public GangliaContext() { @@ -144,7 +144,7 @@ } } - private void emitMetric(String name, String type, String value) + protected void emitMetric(String name, String type, String value) throws IOException { String units = getUnits(name); int slope = getSlope(name); @@ -168,7 +168,7 @@ } } - private String getUnits(String metricName) { + protected String getUnits(String metricName) { String result = unitsTable.get(metricName); if (result == null) { result = DEFAULT_UNITS; @@ -176,7 +176,7 @@ return result; } - private int getSlope(String metricName) { + protected int getSlope(String metricName) { String slopeString = slopeTable.get(metricName); if (slopeString == null) { slopeString = DEFAULT_SLOPE; @@ -184,10 +184,13 @@ return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c } - private int getTmax(String metricName) { + protected int getTmax(String metricName) { if (tmaxTable == null) { return DEFAULT_TMAX; } + if (tmaxTable == null) { + return DEFAULT_TMAX; + } String tmaxString = tmaxTable.get(metricName); if (tmaxString == null) { return DEFAULT_TMAX; @@ -197,7 +200,7 @@ } } - private int getDmax(String metricName) { + protected int getDmax(String metricName) { String dmaxString = dmaxTable.get(metricName); if (dmaxString == null) { return DEFAULT_DMAX; @@ -212,7 +215,7 @@ * as an int, followed by the bytes of the string, padded if necessary to * a multiple of 4. */ - private void xdr_string(String s) { + protected void xdr_string(String s) { byte[] bytes = s.getBytes(); int len = bytes.length; xdr_int(len); @@ -234,7 +237,7 @@ /** * Puts an integer into the buffer as 4 bytes, big-endian. */ - private void xdr_int(int i) { + protected void xdr_int(int i) { buffer[offset++] = (byte)((i >> 24) & 0xff); buffer[offset++] = (byte)((i >> 16) & 0xff); buffer[offset++] = (byte)((i >> 8) & 0xff); Index: src/core/org/apache/hadoop/metrics/ganglia/GangliaContext31.java =================================================================== --- src/core/org/apache/hadoop/metrics/ganglia/GangliaContext31.java (revision 714217) +++ src/core/org/apache/hadoop/metrics/ganglia/GangliaContext31.java (working copy) @@ -22,126 +22,115 @@ import java.io.IOException; import java.net.DatagramPacket; -import java.net.DatagramSocket; import java.net.SocketAddress; -import java.net.SocketException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.net.UnknownHostException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics.ContextFactory; -import org.apache.hadoop.metrics.MetricsException; -import org.apache.hadoop.metrics.spi.AbstractMetricsContext; -import org.apache.hadoop.metrics.spi.OutputRecord; -import org.apache.hadoop.metrics.spi.Util; +import org.apache.hadoop.net.DNS; /** - * Context for sending metrics to Ganglia. + * Context for sending metrics to Ganglia version 3.1.x. * + * 3.1.1 has a slightly different wire portal compared to 3.0.x. */ -public class GangliaContext extends AbstractMetricsContext { - - private static final String PERIOD_PROPERTY = "period"; - private static final String SERVERS_PROPERTY = "servers"; - private static final String UNITS_PROPERTY = "units"; - private static final String SLOPE_PROPERTY = "slope"; - private static final String TMAX_PROPERTY = "tmax"; - private static final String DMAX_PROPERTY = "dmax"; - - private static final String DEFAULT_UNITS = ""; - private static final String DEFAULT_SLOPE = "both"; - private static final int DEFAULT_TMAX = 60; - private static final int DEFAULT_DMAX = 0; - private static final int DEFAULT_PORT = 8649; - private static final int BUFFER_SIZE = 1500; // as per libgmond.c - - private static final Map typeTable = new HashMap(5); - - static { - typeTable.put(String.class, "string"); - typeTable.put(Byte.class, "int8"); - typeTable.put(Short.class, "int16"); - typeTable.put(Integer.class, "int32"); - typeTable.put(Float.class, "float"); - } - - private byte[] buffer = new byte[BUFFER_SIZE]; - private int offset; - - private List metricsServers; - private Map unitsTable; - private Map slopeTable; - private Map tmaxTable; - private Map dmaxTable; - - private DatagramSocket datagramSocket; - - /** Creates a new instance of GangliaContext */ - public GangliaContext() { - } - - public void init(String contextName, ContextFactory factory) - { +public class GangliaContext31 extends GangliaContext { + + String hostName = "UNKNOWN.example.com"; + + private static final Log LOG = + LogFactory.getLog("org.apache.hadoop.util.GangliaContext31"); + + public void init(String contextName, ContextFactory factory) { super.init(contextName, factory); - - String periodStr = getAttribute(PERIOD_PROPERTY); - if (periodStr != null) { - int period = 0; + + LOG.debug("Initializing the GangliaContext31 for Ganglia 3.1 metrics."); + + // Take the hostname from the DNS class. + + Configuration conf = new Configuration(); + + if (conf.get("slave.host.name") != null) { + hostName = conf.get("slave.host.name"); + } else { try { - period = Integer.parseInt(periodStr); - } catch (NumberFormatException nfe) { + hostName = DNS.getDefaultHost( + conf.get("dfs.datanode.dns.interface","default"), + conf.get("dfs.datanode.dns.nameserver","default")); + } catch (UnknownHostException uhe) { + LOG.error(uhe); + hostName = "UNKNOWN.example.com"; } - if (period <= 0) { - throw new MetricsException("Invalid period: " + periodStr); - } - setPeriod(period); } - - metricsServers = - Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); - - unitsTable = getAttributeTable(UNITS_PROPERTY); - slopeTable = getAttributeTable(SLOPE_PROPERTY); - tmaxTable = getAttributeTable(TMAX_PROPERTY); - dmaxTable = getAttributeTable(DMAX_PROPERTY); - - try { - datagramSocket = new DatagramSocket(); - } - catch (SocketException se) { - se.printStackTrace(); - } } - - public void emitRecord(String contextName, String recordName, OutputRecord outRec) + + protected void emitMetric(String name, String type, String value) throws IOException { - // emit each metric in turn - for (String metricName : outRec.getMetricNames()) { - Object metric = outRec.getMetric(metricName); - String type = typeTable.get(metric.getClass()); - emitMetric(metricName, type, metric.toString()); + if (name == null) { + LOG.warn("Metric was emitted with no name."); + return; + } else if (value == null) { + LOG.warn("Metric name " + name +" was emitted with a null value."); + } else if (type == null) { + LOG.warn("Metric name " + name + ", value " + value + " has no type."); } - - } - - private void emitMetric(String name, String type, String value) - throws IOException - { + + LOG.debug("Emitting metric " + name + ", type " + type + ", value " + + value + " from hostname" + hostName); + String units = getUnits(name); + if (units == null) { + LOG.warn("Metric name " + name + ", value " + value + + " had 'null' units"); + units = ""; + } int slope = getSlope(name); int tmax = getTmax(name); int dmax = getDmax(name); - offset = 0; - xdr_int(0); // metric_user_defined - xdr_string(type); - xdr_string(name); - xdr_string(value); - xdr_string(units); - xdr_int(slope); - xdr_int(tmax); - xdr_int(dmax); + String groupName = name.substring(0,name.lastIndexOf(".")); + + // The following XDR recipe was done through a careful reading of + // gm_protocol.x in Ganglia 3.1 and carefully examining the output of + // the gmetric utility with strace. + + // First we send out a metadata message + xdr_int(128); // metric_id = metadata_msg + xdr_string(hostName); // hostname + xdr_string(name); // metric name + xdr_int(0); // spoof = False + xdr_string(type); // metric type + xdr_string(name); // metric name + xdr_string(units); // units + xdr_int(slope); // slope + xdr_int(tmax); // tmax, the maximum time between metrics + xdr_int(dmax); // dmax, the maximum data value + + xdr_int(1); /*Num of the entries in extra_value field for + Ganglia 3.1.x*/ + xdr_string("GROUP"); /*Group attribute*/ + xdr_string(groupName); /*Group value*/ + + for (SocketAddress socketAddress : metricsServers) { + DatagramPacket packet = + new DatagramPacket(buffer, offset, socketAddress); + datagramSocket.send(packet); + } + + // Now we send out a message with the actual value. + // Technically, we only need to send out the metadata message once for + // each metric, but I don't want to have to record which metrics we did and + // did not send. + offset = 0; + xdr_int(133); // we are sending a string value + xdr_string(hostName); // hostName + xdr_string(name); // metric name + xdr_int(0); // spoof = False + xdr_string("%s"); // format field + xdr_string(value); // metric value for (SocketAddress socketAddress : metricsServers) { DatagramPacket packet = @@ -149,75 +138,5 @@ datagramSocket.send(packet); } } - - private String getUnits(String metricName) { - String result = unitsTable.get(metricName); - if (result == null) { - result = DEFAULT_UNITS; - } - return result; - } - - private int getSlope(String metricName) { - String slopeString = slopeTable.get(metricName); - if (slopeString == null) { - slopeString = DEFAULT_SLOPE; - } - return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c - } - - private int getTmax(String metricName) { - String tmaxString = tmaxTable.get(metricName); - if (tmaxString == null) { - return DEFAULT_TMAX; - } - else { - return Integer.parseInt(tmaxString); - } - } - - private int getDmax(String metricName) { - String dmaxString = dmaxTable.get(metricName); - if (dmaxString == null) { - return DEFAULT_DMAX; - } - else { - return Integer.parseInt(dmaxString); - } - } - - /** - * Puts a string into the buffer by first writing the size of the string - * as an int, followed by the bytes of the string, padded if necessary to - * a multiple of 4. - */ - private void xdr_string(String s) { - byte[] bytes = s.getBytes(); - int len = bytes.length; - xdr_int(len); - System.arraycopy(bytes, 0, buffer, offset, len); - offset += len; - pad(); - } - /** - * Pads the buffer with zero bytes up to the nearest multiple of 4. - */ - private void pad() { - int newOffset = ((offset + 3) / 4) * 4; - while (offset < newOffset) { - buffer[offset++] = 0; - } - } - - /** - * Puts an integer into the buffer as 4 bytes, big-endian. - */ - private void xdr_int(int i) { - buffer[offset++] = (byte)((i >> 24) & 0xff); - buffer[offset++] = (byte)((i >> 16) & 0xff); - buffer[offset++] = (byte)((i >> 8) & 0xff); - buffer[offset++] = (byte)(i & 0xff); - } - } Index: src/test/org/apache/hadoop/metrics/TestGangliaContext31.java =================================================================== --- src/test/org/apache/hadoop/metrics/TestGangliaContext31.java (revision 0) +++ src/test/org/apache/hadoop/metrics/TestGangliaContext31.java (revision 0) @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.net.UnknownHostException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.net.DNS; + +/** + * This class creates starts up a Hadoop cluster and configures it to use the + * GangliaContext31 metrics class. We then verify that it sends out valid + * Ganglia data to a separate thread spawned to listen on the Ganglia socket. + */ +public class TestGangliaContext31 extends junit.framework.TestCase { + + private static final Log LOG = LogFactory.getLog(TestGangliaContext31.class); + + /** + * This class is a runnable that will listen for Ganglia connections. + */ + class GangliaSocketListener implements Runnable { + + private boolean isConfigured = false; + private boolean hasData = false; + private byte[] byteData; + private int port; + + public void run() { + DatagramSocket s; + try { + s = new DatagramSocket(); + setPort(s.getLocalPort()); + setConfigured(true); + } catch (IOException e) { + LOG.warn(e); + synchronized(this) { + this.notify(); + } + return; + } + + byte [] b = new byte[8192]; + DatagramPacket info = new DatagramPacket(b, b.length); + + synchronized(this) { + this.notify(); + } + try { + s.receive(info); + } catch (IOException e) { + LOG.warn(e); + synchronized(this) { + this.notify(); + } + return; + } + LOG.info("Got a new packet, length " + info.getLength()); + int bytesRead = info.getLength(); + if (bytesRead > 0) + setHasData(true); + + byteData = new byte[info.getLength()]; + System.arraycopy(info.getData(), 0, byteData, 0, bytesRead); + synchronized(this) { + this.notify(); + } + } + + public void setConfigured(boolean isConfigured) { + this.isConfigured = isConfigured; + } + + public boolean getConfigured() { + return isConfigured; + } + + public void setHasData(boolean hasData) { + this.hasData = hasData; + } + + public boolean getHasData() { + return hasData; + } + + public byte[] getBytes() { + return byteData; + } + + public void setPort(int port) { + this.port = port; + } + + public int getPort() { + return port; + } + + } + + public String getHostname(Configuration conf) { + String hostName; + if (conf.get("slave.host.name") != null) { + hostName = conf.get("slave.host.name"); + } else { + try { + hostName = DNS.getDefaultHost( + conf.get("dfs.datanode.dns.interface","default"), + conf.get("dfs.datanode.dns.nameserver","default")); + } catch (UnknownHostException uhe) { + LOG.error(uhe); + hostName = "UNKNOWN.example.com"; + } + } + return hostName; + } + + public void testGanglia31Metrics() throws IOException { + Configuration conf = new Configuration(); + + String hostName = getHostname(conf); + + GangliaSocketListener listener = new GangliaSocketListener(); + Thread listenerThread = new Thread(listener); + listenerThread.start(); + try { + synchronized(listener) { + listener.wait(); + } + } catch (InterruptedException e) { + LOG.warn(e); + } + + assertTrue("Could not configure the socket listener for Ganglia", listener.getConfigured()); + + LOG.info("Listening to port " + listener.getPort()); + + ContextFactory contextFactory = ContextFactory.getFactory(); + contextFactory.setAttribute("dfs.class", "org.apache.hadoop.metrics.ganglia.GangliaContext31"); + contextFactory.setAttribute("dfs.period", "1"); + contextFactory.setAttribute("dfs.servers", "localhost:" + listener.getPort()); + + MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); + try { + if (!listener.getHasData()) + synchronized(listener) { + listener.wait(5*1000); // Wait at most 5 seconds for Ganglia data + } + } catch (InterruptedException e) { + LOG.warn(e); + } + assertTrue("Did not recieve Ganglia data", listener.getHasData()); + cluster.shutdown(); + + byte [] hostNameBytes = hostName.getBytes(); + + byte [] xdrBytes = listener.getBytes(); + + // Try to make sure that the received bytes from Ganglia has the correct hostname for this host + boolean hasHostname = false; + LOG.info("Checking to make sure that the Ganglia data contains host " + hostName); + for (int i=0; i