From a39ce657b49bbb0628c12b0c611bc4517a462825 Mon Sep 17 00:00:00 2001 From: chenheng Date: Mon, 16 Nov 2015 18:09:30 +0800 Subject: [PATCH] HBASE-14649 Reenable TestHCM.testClusterStatus; disabled because flakey --- .../hadoop/hbase/client/ClusterStatusListener.java | 48 ++++++++++++++++++++-- .../hbase/master/ClusterStatusPublisher.java | 6 ++- .../org/apache/hadoop/hbase/client/TestHCM.java | 2 +- 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java index 5756232..aa2f44e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java @@ -21,26 +21,32 @@ package org.apache.hadoop.hbase.client; import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ChannelFactory; import io.netty.buffer.ByteBufInputStream; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.Channel; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.ChannelException; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.NioDatagramChannel; import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; +import io.netty.util.internal.StringUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -52,6 +58,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; @@ -204,11 +211,20 @@ class ClusterStatusListener implements Closeable { close(); throw new IOException("Can't connect to " + mcAddress, e); } - + InternetProtocolFamily family; + InetAddress localAddress; + if (ina instanceof Inet6Address) { + localAddress = Addressing.getIp6Address(); + family = InternetProtocolFamily.IPv6; + }else{ + localAddress = Addressing.getIp4Address(); + family = InternetProtocolFamily.IPv4; + } + LOG.info("@@LISTENER@@ localAddress=" + localAddress + ", family=" + family); try { Bootstrap b = new Bootstrap(); b.group(group) - .channel(NioDatagramChannel.class) + .channelFactory(new HBaseDatagramChannelFactory(NioDatagramChannel.class, family)) .option(ChannelOption.SO_REUSEADDR, true) .handler(new ClusterStatusHandler()); @@ -218,7 +234,7 @@ class ClusterStatusListener implements Closeable { throw ExceptionUtil.asInterrupt(e); } - NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); + NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress); channel.joinGroup(ina, ni, null, channel.newPromise()); } @@ -231,6 +247,31 @@ class ClusterStatusListener implements Closeable { group.shutdownGracefully(); } + private final class HBaseDatagramChannelFactory implements ChannelFactory { + private final Class clazz; + private InternetProtocolFamily family; + + public HBaseDatagramChannelFactory(Class clazz, InternetProtocolFamily family) { + this.clazz = clazz; + this.family = family; + } + + @Override + public T newChannel() { + try { + return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), + new Class[] { InternetProtocolFamily.class }, new Object[] { family }); + + } catch (Throwable t) { + throw new ChannelException("Unable to create Channel from class " + clazz, t); + } + } + + @Override + public String toString() { + return StringUtil.simpleClassName(clazz) + ".class"; + } + } /** @@ -258,6 +299,7 @@ class ClusterStatusListener implements Closeable { try { ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis); ClusterStatus ncs = ClusterStatus.convert(csp); + LOG.info("@@LISTEN@@ recieve:" + ncs.toString()); receive(ncs); } finally { bis.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java index e90aae6..546423b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java @@ -52,6 +52,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -77,6 +79,7 @@ import org.apache.hadoop.hbase.util.VersionInfo; */ @InterfaceAudience.Private public class ClusterStatusPublisher extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(ClusterStatusPublisher.class); /** * The implementation class used to publish the status. Default is null (no publish). * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the @@ -285,7 +288,7 @@ public class ClusterStatusPublisher extends ScheduledChore { .channelFactory(new HBaseDatagramChannelFactory(NioDatagramChannel.class, family)) .option(ChannelOption.SO_REUSEADDR, true) .handler(new ClusterStatusEncoder(isa)); - + LOG.info("@@PUBLISH@@ localAddress=" + localAddress + ", family=" + family); try { channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel(); channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); @@ -339,6 +342,7 @@ public class ClusterStatusPublisher extends ScheduledChore { @Override public void publish(ClusterStatus cs) { + LOG.info("@@PUBLISH@@ publish" + cs.toString()); channel.writeAndFlush(cs).syncUninterruptibly(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 16465d2..d40b901 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -213,7 +213,7 @@ public class TestHCM { } // Fails too often! Needs work. HBASE-12558 - @Ignore @Test (expected = RegionServerStoppedException.class) + @Test (expected = RegionServerStoppedException.class) public void testClusterStatus() throws Exception { TableName tn = -- 1.9.3 (Apple Git-50)