Index: src/core/org/apache/hadoop/metrics/ganglia/GangliaContext31.java ================================================================== --- src/core/org/apache/hadoop/metrics/ganglia/GangliaContext31.java 1970-01-01 08:00:00.000000000 +0800 +++ src/core/org/apache/hadoop/metrics/ganglia/GangliaContext31.java 2008-11-25 19:02:18.000000000 +0800 @@ -0,0 +1,137 @@ +/* + * GangliaContext31.java + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics.ganglia; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.metrics.ContextFactory; +import org.apache.hadoop.metrics.MetricsException; +import org.apache.hadoop.metrics.spi.AbstractMetricsContext; +import org.apache.hadoop.metrics.spi.OutputRecord; +import org.apache.hadoop.metrics.spi.Util; + +/** + * Context for sending metrics to Ganglia version 3.1.x. + * + * 3.1.1 has a slightly different wire portal compared to 3.0.x. + */ +public class GangliaContext31 extends GangliaContext { + + String hostName = "UNKNOWN.example.com"; + + private static final Log LOG = + LogFactory.getLog("org.apache.hadoop.util.GangliaContext31"); + + public void init(String contextName, ContextFactory factory) { + super.init(contextName, factory); + + LOG.debug("Initializing the GangliaContext31 for Ganglia 3.1 metrics."); + + try { + InetAddress localMachine = InetAddress.getLocalHost(); + hostName = localMachine.getHostName(); + } catch (UnknownHostException uhe) { + LOG.error(uhe); + } + } + + protected void emitMetric(String name, String type, String value) + throws IOException + { + if (name == null) { + LOG.warn("Metric was emitted with no name."); + return; + } else if (value == null) { + LOG.warn("Metric name " + name +" was emitted with a null value."); + } else if (type == null) { + LOG.warn("Metric name " + name + ", value " + value + " has no type."); + } + + LOG.debug("Emitting metric " + name + ", type " + type + ", value " + value + " from hostname" + hostName); + + String units = getUnits(name); + if (units == null) { + LOG.warn("Metric name " + name + ", value " + value + + " had 'null' units"); + units = ""; + } + int slope = getSlope(name); + int tmax = getTmax(name); + int dmax = getDmax(name); + offset = 0; + String GroupName = name.substring(0,name.lastIndexOf(".")); + // The following XDR recipe was done through a careful reading of + // gm_protocol.x in Ganglia 3.1 and carefully examining the output of + // the gmetric utility with strace. + + // First we send out a metadata message + xdr_int(128); // metric_id = metadata_msg + xdr_string(hostName); // hostname + xdr_string(name); // metric name + xdr_int(0); // spoof = False + xdr_string(type); // metric type + xdr_string(name); // metric name + xdr_string(units); // units + xdr_int(slope); // slope + xdr_int(tmax); // tmax, the maximum time between metrics + xdr_int(dmax); // dmax, the maximum data value + //xdr_string(""); // Empty extra_value field for Ganglia 3.1. + xdr_int(1); + xdr_string("GROUP"); + xdr_string(GroupName); + for (SocketAddress socketAddress : metricsServers) { + DatagramPacket packet = + new DatagramPacket(buffer, offset, socketAddress); + datagramSocket.send(packet); + } + + // Now we send out a message with the actual value. + // Technically, we only need to send out the metadata message once for + // each metric, but I don't want to have to record which metrics we did and + // did not send. + offset = 0; + xdr_int(133); // we are sending a string value + xdr_string(hostName); // hostName + xdr_string(name); // metric name + xdr_int(0); // spoof = False + xdr_string("%s"); // format field + xdr_string(value); // metric value + + for (SocketAddress socketAddress : metricsServers) { + DatagramPacket packet = + new DatagramPacket(buffer, offset, socketAddress); + datagramSocket.send(packet); + } + } + +} Index: src/core/org/apache/hadoop/metrics/ganglia/GangliaContext.java ================================================================== --- src/core/org/apache/hadoop/metrics/ganglia/GangliaContext.java 2008-10-28 11:40:06.000000000 +0800 +++ src/core/org/apache/hadoop/metrics/ganglia/GangliaContext.java 2008-11-25 18:03:35.000000000 +0800 @@ -29,6 +29,9 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.metrics.ContextFactory; import org.apache.hadoop.metrics.MetricsException; import org.apache.hadoop.metrics.spi.AbstractMetricsContext; @@ -54,7 +57,10 @@ private static final int DEFAULT_DMAX = 0; private static final int DEFAULT_PORT = 8649; private static final int BUFFER_SIZE = 1500; // as per libgmond.c - + + private static final Log LOG = + LogFactory.getLog("org.apache.hadoop.util.GangliaContext"); + private static final Map typeTable = new HashMap(5); static { @@ -62,19 +68,20 @@ typeTable.put(Byte.class, "int8"); typeTable.put(Short.class, "int16"); typeTable.put(Integer.class, "int32"); + typeTable.put(Long.class, "float"); typeTable.put(Float.class, "float"); } - private byte[] buffer = new byte[BUFFER_SIZE]; - private int offset; + protected byte[] buffer = new byte[BUFFER_SIZE]; + protected int offset; - private List metricsServers; + protected List metricsServers; private Map unitsTable; private Map slopeTable; private Map tmaxTable; private Map dmaxTable; - private DatagramSocket datagramSocket; + protected DatagramSocket datagramSocket; /** Creates a new instance of GangliaContext */ public GangliaContext() { @@ -112,20 +119,36 @@ se.printStackTrace(); } } - + public void emitRecord(String contextName, String recordName, OutputRecord outRec) throws IOException { + + // Setup so that the records have the proper leader names so they are unambiguous at the ganglia level, and this prevents a lot of rework + StringBuffer sb = new StringBuffer(); + sb.setLength(0); + sb.append(contextName); + sb.append('.'); + sb.append(recordName); + sb.append('.'); + int sbBaseLen = sb.length(); + // emit each metric in turn for (String metricName : outRec.getMetricNames()) { Object metric = outRec.getMetric(metricName); String type = typeTable.get(metric.getClass()); - emitMetric(metricName, type, metric.toString()); + if (type != null) { + sb.append(metricName); + emitMetric(sb.toString(), type, metric.toString()); + sb.setLength(sbBaseLen); + } else { + LOG.warn("Unknown metrics type: " + metric.getClass()); + } } } - private void emitMetric(String name, String type, String value) + protected void emitMetric(String name, String type, String value) throws IOException { String units = getUnits(name); @@ -150,7 +173,7 @@ } } - private String getUnits(String metricName) { + protected String getUnits(String metricName) { String result = unitsTable.get(metricName); if (result == null) { result = DEFAULT_UNITS; @@ -158,7 +181,7 @@ return result; } - private int getSlope(String metricName) { + protected int getSlope(String metricName) { String slopeString = slopeTable.get(metricName); if (slopeString == null) { slopeString = DEFAULT_SLOPE; @@ -166,7 +189,10 @@ return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c } - private int getTmax(String metricName) { + protected int getTmax(String metricName) { + if (tmaxTable == null) { + return DEFAULT_TMAX; + } String tmaxString = tmaxTable.get(metricName); if (tmaxString == null) { return DEFAULT_TMAX; @@ -176,7 +202,7 @@ } } - private int getDmax(String metricName) { + protected int getDmax(String metricName) { String dmaxString = dmaxTable.get(metricName); if (dmaxString == null) { return DEFAULT_DMAX; @@ -191,7 +217,7 @@ * as an int, followed by the bytes of the string, padded if necessary to * a multiple of 4. */ - private void xdr_string(String s) { + protected void xdr_string(String s) { byte[] bytes = s.getBytes(); int len = bytes.length; xdr_int(len); @@ -213,7 +239,7 @@ /** * Puts an integer into the buffer as 4 bytes, big-endian. */ - private void xdr_int(int i) { + protected void xdr_int(int i) { buffer[offset++] = (byte)((i >> 24) & 0xff); buffer[offset++] = (byte)((i >> 16) & 0xff); buffer[offset++] = (byte)((i >> 8) & 0xff);