Index: conf/hadoop-metrics.properties
===================================================================
--- conf/hadoop-metrics.properties	(revision 771522)
+++ conf/hadoop-metrics.properties	(working copy)
@@ -7,7 +7,9 @@
 #dfs.fileName=/tmp/dfsmetrics.log
 
 # Configuration of the "dfs" context for ganglia
+# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter)
 # dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
+# dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
 # dfs.period=10
 # dfs.servers=localhost:8649
 
@@ -21,13 +23,15 @@
 #mapred.fileName=/tmp/mrmetrics.log
 
 # Configuration of the "mapred" context for ganglia
+# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter)
 # mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
+# mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
 # mapred.period=10
 # mapred.servers=localhost:8649
 
 
 # Configuration of the "jvm" context for null
-jvm.class=org.apache.hadoop.metrics.spi.NullContext
+#jvm.class=org.apache.hadoop.metrics.spi.NullContext
 
 # Configuration of the "jvm" context for file
 #jvm.class=org.apache.hadoop.metrics.file.FileContext
Index: src/test/core/org/apache/hadoop/metrics/TestGangliaContext31.java
===================================================================
--- src/test/core/org/apache/hadoop/metrics/TestGangliaContext31.java	(revision 0)
+++ src/test/core/org/apache/hadoop/metrics/TestGangliaContext31.java	(revision 0)
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.net.DNS;
+
+/**
+ * This class creates starts up a Hadoop cluster and configures it to use the
+ * GangliaContext31 metrics class. We then verify that it sends out valid
+ * Ganglia data to a separate thread spawned to listen on the Ganglia socket.
+ */
+public class TestGangliaContext31 extends junit.framework.TestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestGangliaContext31.class);
+
+  /**
+   * This class is a runnable that will listen for Ganglia connections.
+   */
+  class GangliaSocketListener implements Runnable {
+
+    private boolean isConfigured = false;
+    private boolean hasData = false;
+    private byte[] byteData;
+    private int port;
+    
+    public void run() {
+      DatagramSocket s;
+      try {
+        s = new DatagramSocket();
+        setPort(s.getLocalPort());
+        setConfigured(true);
+      } catch (IOException e) {
+        LOG.warn(e);
+        synchronized(this) {
+          this.notify();
+        }
+        return;
+      }
+
+      byte [] b = new byte[8192];
+      DatagramPacket info = new DatagramPacket(b, b.length);
+
+      synchronized(this) {
+        this.notify();
+      }
+      try {
+        s.receive(info);
+      } catch (IOException e) {
+        LOG.warn(e);
+        synchronized(this) {
+          this.notify();
+        }
+        return;
+      }
+      LOG.info("Got a new packet, length " + info.getLength());
+      int bytesRead = info.getLength();
+      if (bytesRead > 0)
+        setHasData(true);
+      
+      byteData = new byte[info.getLength()];
+      System.arraycopy(info.getData(), 0, byteData, 0, bytesRead);
+      synchronized(this) {
+        this.notify();
+      }
+    }
+
+    public void setConfigured(boolean isConfigured) {
+      this.isConfigured = isConfigured;
+    }
+
+    public boolean getConfigured() {
+      return isConfigured;
+    }
+
+    public void setHasData(boolean hasData) {
+      this.hasData = hasData;
+    }
+
+    public boolean getHasData() {
+      return hasData;
+    }
+    
+    public byte[] getBytes() {
+      return byteData;
+    }
+
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+  }
+
+  public String getHostname(Configuration conf) {
+    String hostName;
+    if (conf.get("slave.host.name") != null) {
+      hostName = conf.get("slave.host.name");
+    } else {
+      try {
+        hostName = DNS.getDefaultHost(
+          conf.get("dfs.datanode.dns.interface","default"),
+          conf.get("dfs.datanode.dns.nameserver","default"));
+      } catch (UnknownHostException uhe) {
+        LOG.error(uhe);
+      hostName = "UNKNOWN.example.com";
+      }
+    }
+    return hostName;
+  }
+  
+  public void testGanglia31Metrics() throws IOException {
+    Configuration conf = new Configuration();
+
+    String hostName = getHostname(conf);
+    
+    GangliaSocketListener listener = new GangliaSocketListener();
+    Thread listenerThread = new Thread(listener);
+    listenerThread.start();
+    try {
+      synchronized(listener) {
+        listener.wait();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn(e);
+    }
+    
+    assertTrue("Could not configure the socket listener for Ganglia", listener.getConfigured());
+
+    LOG.info("Listening to port " + listener.getPort());
+
+    ContextFactory contextFactory = ContextFactory.getFactory();
+    contextFactory.setAttribute("dfs.class", "org.apache.hadoop.metrics.ganglia.GangliaContext31");
+    contextFactory.setAttribute("dfs.period", "1");
+    contextFactory.setAttribute("dfs.servers", "localhost:" + listener.getPort());
+    
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    try {
+      if (!listener.getHasData())
+        synchronized(listener) {
+          listener.wait(5*1000); // Wait at most 5 seconds for Ganglia data
+      }
+    } catch (InterruptedException e) {
+      LOG.warn(e);
+    } 
+    assertTrue("Did not recieve Ganglia data", listener.getHasData());
+    cluster.shutdown();
+
+    byte [] hostNameBytes = hostName.getBytes();
+    
+    byte [] xdrBytes = listener.getBytes();
+    
+    // Try to make sure that the received bytes from Ganglia has the correct hostname for this host
+    boolean hasHostname = false;
+    LOG.info("Checking to make sure that the Ganglia data contains host " + hostName);
+    for (int i=0; i<xdrBytes.length-hostNameBytes.length; i++) {
+      hasHostname = true;
+      for (int j=0; j<hostNameBytes.length; j++) {
+        if (xdrBytes[i+j] != hostNameBytes[j]) {
+          hasHostname=false;
+          break;
+        }
+      }
+      if (hasHostname)
+        break;
+    }
+    assertTrue("Did not correctly resolve hostname in Ganglia", hasHostname);
+    
+  }
+
+}
Index: src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
===================================================================
--- src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java	(revision 771522)
+++ src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java	(working copy)
@@ -71,16 +71,16 @@
     typeTable.put(Float.class, "float");
   }
     
-  private byte[] buffer = new byte[BUFFER_SIZE];
-  private int offset;
+  protected byte[] buffer = new byte[BUFFER_SIZE];
+  protected int offset;
     
-  private List<? extends SocketAddress> metricsServers;
+  protected List<? extends SocketAddress> metricsServers;
   private Map<String,String> unitsTable;
   private Map<String,String> slopeTable;
   private Map<String,String> tmaxTable;
   private Map<String,String> dmaxTable;
     
-  private DatagramSocket datagramSocket;
+  protected DatagramSocket datagramSocket;
     
   /** Creates a new instance of GangliaContext */
   public GangliaContext() {
@@ -132,7 +132,7 @@
     }
   }
     
-  private void emitMetric(String name, String type,  String value) 
+  protected void emitMetric(String name, String type,  String value) 
   throws IOException {
     String units = getUnits(name);
     int slope = getSlope(name);
@@ -156,7 +156,7 @@
     }
   }
     
-  private String getUnits(String metricName) {
+  protected String getUnits(String metricName) {
     String result = unitsTable.get(metricName);
     if (result == null) {
       result = DEFAULT_UNITS;
@@ -164,7 +164,7 @@
     return result;
   }
     
-  private int getSlope(String metricName) {
+  protected int getSlope(String metricName) {
     String slopeString = slopeTable.get(metricName);
     if (slopeString == null) {
       slopeString = DEFAULT_SLOPE; 
@@ -172,7 +172,7 @@
     return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
   }
     
-  private int getTmax(String metricName) {
+  protected int getTmax(String metricName) {
     if (tmaxTable == null) {
       return DEFAULT_TMAX;
     }
@@ -185,7 +185,7 @@
     }
   }
     
-  private int getDmax(String metricName) {
+  protected int getDmax(String metricName) {
     String dmaxString = dmaxTable.get(metricName);
     if (dmaxString == null) {
       return DEFAULT_DMAX;
@@ -200,7 +200,7 @@
    * as an int, followed by the bytes of the string, padded if necessary to
    * a multiple of 4.
    */
-  private void xdr_string(String s) {
+  protected void xdr_string(String s) {
     byte[] bytes = s.getBytes();
     int len = bytes.length;
     xdr_int(len);
@@ -222,7 +222,7 @@
   /**
    * Puts an integer into the buffer as 4 bytes, big-endian.
    */
-  private void xdr_int(int i) {
+  protected void xdr_int(int i) {
     buffer[offset++] = (byte)((i >> 24) & 0xff);
     buffer[offset++] = (byte)((i >> 16) & 0xff);
     buffer[offset++] = (byte)((i >> 8) & 0xff);
Index: src/java/org/apache/hadoop/metrics/ganglia/GangliaContext31.java
===================================================================
--- src/java/org/apache/hadoop/metrics/ganglia/GangliaContext31.java	(revision 0)
+++ src/java/org/apache/hadoop/metrics/ganglia/GangliaContext31.java	(revision 0)
@@ -0,0 +1,144 @@
+/*
+ * GangliaContext.java
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics.ganglia;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.net.DNS;
+
+/**
+ * Context for sending metrics to Ganglia version 3.1.x.
+ * 
+ * 3.1.1 has a slightly different wire portal compared to 3.0.x.
+ */
+public class GangliaContext31 extends GangliaContext {
+
+  String hostName = "UNKNOWN.example.com";
+
+  private static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.util.GangliaContext31");
+
+  public void init(String contextName, ContextFactory factory) {
+    super.init(contextName, factory);
+
+    LOG.debug("Initializing the GangliaContext31 for Ganglia 3.1 metrics.");
+
+    // Take the hostname from the DNS class.
+
+    Configuration conf = new Configuration();
+
+    if (conf.get("slave.host.name") != null) {
+      hostName = conf.get("slave.host.name");
+    } else {
+      try {
+        hostName = DNS.getDefaultHost(
+          conf.get("dfs.datanode.dns.interface","default"),
+          conf.get("dfs.datanode.dns.nameserver","default"));
+      } catch (UnknownHostException uhe) {
+        LOG.error(uhe);
+    	hostName = "UNKNOWN.example.com";
+      }
+    }
+  }
+
+  protected void emitMetric(String name, String type,  String value) 
+    throws IOException
+  {
+    if (name == null) {
+      LOG.warn("Metric was emitted with no name.");
+      return;
+    } else if (value == null) {
+      LOG.warn("Metric name " + name +" was emitted with a null value.");
+      return;
+    } else if (type == null) {
+      LOG.warn("Metric name " + name + ", value " + value + " has no type.");
+      return;
+    }
+
+    LOG.debug("Emitting metric " + name + ", type " + type + ", value " + 
+      value + " from hostname" + hostName);
+
+    String units = getUnits(name);
+    if (units == null) {
+      LOG.warn("Metric name " + name + ", value " + value
+        + " had 'null' units");
+      units = "";
+    }
+    int slope = getSlope(name);
+    int tmax = getTmax(name);
+    int dmax = getDmax(name);
+    offset = 0;
+    String groupName = name.substring(0,name.lastIndexOf("."));
+
+    // The following XDR recipe was done through a careful reading of
+    // gm_protocol.x in Ganglia 3.1 and carefully examining the output of
+    // the gmetric utility with strace.
+
+    // First we send out a metadata message
+    xdr_int(128);         // metric_id = metadata_msg
+    xdr_string(hostName); // hostname
+    xdr_string(name);     // metric name
+    xdr_int(0);           // spoof = False
+    xdr_string(type);     // metric type
+    xdr_string(name);     // metric name
+    xdr_string(units);    // units
+    xdr_int(slope);       // slope
+    xdr_int(tmax);        // tmax, the maximum time between metrics
+    xdr_int(dmax);        // dmax, the maximum data value
+
+    xdr_int(1);             /*Num of the entries in extra_value field for 
+                              Ganglia 3.1.x*/
+    xdr_string("GROUP");    /*Group attribute*/
+    xdr_string(groupName);  /*Group value*/
+
+    for (SocketAddress socketAddress : metricsServers) {
+      DatagramPacket packet =
+        new DatagramPacket(buffer, offset, socketAddress);
+      datagramSocket.send(packet);
+    }
+
+    // Now we send out a message with the actual value.
+    // Technically, we only need to send out the metadata message once for
+    // each metric, but I don't want to have to record which metrics we did and
+    // did not send.
+    offset = 0;
+    xdr_int(133);         // we are sending a string value
+    xdr_string(hostName); // hostName
+    xdr_string(name);     // metric name
+    xdr_int(0);           // spoof = False
+    xdr_string("%s");     // format field
+    xdr_string(value);    // metric value
+        
+    for (SocketAddress socketAddress : metricsServers) {
+      DatagramPacket packet = 
+        new DatagramPacket(buffer, offset, socketAddress);
+      datagramSocket.send(packet);
+    }
+  }
+
+}
