org.apache.kerby
diff --git a/hbase-kafka-model/pom.xml b/hbase-kafka-model/pom.xml
new file mode 100755
index 0000000000..52256b9df5
--- /dev/null
+++ b/hbase-kafka-model/pom.xml
@@ -0,0 +1,326 @@
+
+
+
+ 4.0.0
+
+
+ hbase-build-configuration
+ org.apache.hbase
+ 3.0.0-SNAPSHOT
+ ../hbase-build-configuration
+
+
+
+ hbase-kafka-model
+ Apache HBase - Model Objects for Kafka Proxy
+ Model objects that can be used consume hbase-kafka-proxy messages
+
+
+
+
+
+ org.apache.avro
+ avro
+
+
+
+
+
+ ${project.basedir}/target/java
+
+
+ src/main/resources/
+
+ hbase-default.xml
+
+
+
+
+
+ src/test/resources/META-INF/
+ META-INF/
+
+ NOTICE
+
+ true
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-sources
+
+ schema
+
+
+ ${project.basedir}/src/main/avro/
+ ${project.basedir}/target/java/
+
+ **/*.avro
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-remote-resources-plugin
+
+
+ org.apache.maven.plugins
+ maven-site-plugin
+
+ true
+
+
+
+
+ maven-assembly-plugin
+
+ true
+
+
+
+ maven-surefire-plugin
+
+
+
+ listener
+ org.apache.hadoop.hbase.ResourceCheckerJUnitListener
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+ hbase-default.xml
+
+
+
+
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ [${maven.antrun.version}]
+
+ run
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ [2.8,)
+
+ build-classpath
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ [3.2,)
+
+ compile
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ apache-release
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+
+
+ license-javadocs
+ prepare-package
+
+ copy-resources
+
+
+ ${project.build.directory}/apidocs
+
+
+ src/main/javadoc/META-INF/
+ META-INF/
+
+ NOTICE
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+ skipCommonTests
+
+
+ skipCommonTests
+
+
+
+ true
+ true
+
+
+
+
+
+
+ hadoop-2.0
+
+
+
+
+ !hadoop.profile
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ create-mrapp-generated-classpath
+ generate-test-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/test-classes/mrapp-generated-classpath
+
+
+
+
+
+
+
+
+
+
+ hadoop-3.0
+
+
+ hadoop.profile
+ 3.0
+
+
+
+ 3.0-SNAPSHOT
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ create-mrapp-generated-classpath
+ generate-test-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/test-classes/mrapp-generated-classpath
+
+
+
+
+
+
+
+
+
diff --git a/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro
new file mode 100755
index 0000000000..cbc0a4ea8b
--- /dev/null
+++ b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+{"namespace": "org.apache.hadoop.hbase.kafka",
+ "type": "record",
+ "name": "HbaseKafkaEvent",
+ "fields": [
+ {"name": "key", "type": "bytes"},
+ {"name": "timestamp", "type": "long" },
+ {"name": "delete", "type": "boolean" },
+ {"name": "value", "type": "bytes"},
+ {"name": "qualifier", "type": "bytes"},
+ {"name": "family", "type": "bytes"},
+ {"name": "table", "type": "bytes"}
+ ]
+}
+
diff --git a/hbase-kafka-proxy/pom.xml b/hbase-kafka-proxy/pom.xml
new file mode 100755
index 0000000000..15f91026ec
--- /dev/null
+++ b/hbase-kafka-proxy/pom.xml
@@ -0,0 +1,332 @@
+
+
+
+ 4.0.0
+
+ hbase-build-configuration
+ org.apache.hbase
+ 3.0.0-SNAPSHOT
+ ../hbase-build-configuration
+
+
+ hbase-kafka-proxy
+ Apache HBase - Kafka Proxy
+ Proxy that forwards HBase replication events to a Kakfa broker
+
+
+
+
+ org.apache.maven.plugins
+ maven-remote-resources-plugin
+
+
+ org.apache.maven.plugins
+ maven-site-plugin
+
+ true
+
+
+
+
+ maven-assembly-plugin
+
+ true
+
+
+
+ maven-surefire-plugin
+
+
+
+ listener
+ org.apache.hadoop.hbase.ResourceCheckerJUnitListener
+
+
+
+
+
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ [${maven.antrun.version}]
+
+ run
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ [2.8,)
+
+ build-classpath
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ [3.2,)
+
+ compile
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.avro
+ avro
+
+
+ org.apache.hbase
+ hbase-kafka-model
+ compile
+
+
+ org.apache.hbase
+ hbase-common
+
+
+ org.apache.hbase
+ hbase-client
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.hbase
+ hbase-annotations
+ ${project.version}
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.9.0.1
+ provided
+
+
+
+
+ commons-logging
+ commons-logging
+
+
+ commons-codec
+ commons-codec
+ compile
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.commons
+ commons-collections4
+ compile
+
+
+ commons-io
+ commons-io
+ compile
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ org.apache.commons
+ commons-crypto
+
+
+
+
+
+
+ apache-release
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+
+
+ license-javadocs
+ prepare-package
+
+ copy-resources
+
+
+ ${project.build.directory}/apidocs
+
+
+ src/main/javadoc/META-INF/
+ META-INF/
+
+ NOTICE
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+ skipCommonTests
+
+
+ skipCommonTests
+
+
+
+ true
+ true
+
+
+
+
+
+
+ hadoop-2.0
+
+
+
+
+ !hadoop.profile
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ create-mrapp-generated-classpath
+ generate-test-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/test-classes/mrapp-generated-classpath
+
+
+
+
+
+
+
+
+
+
+
+ hadoop-3.0
+
+
+ hadoop.profile
+ 3.0
+
+
+
+ 3.0-SNAPSHOT
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ create-mrapp-generated-classpath
+ generate-test-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/test-classes/mrapp-generated-classpath
+
+
+
+
+
+
+
+
+
+
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
new file mode 100755
index 0000000000..6f51431563
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+/**
+ * Rule that indicates the Cell should not be replicated
+ */
+public class DropRule extends Rule {
+ public DropRule() {
+ }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
new file mode 100755
index 0000000000..e42d045a48
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+
+
+/**
+ * connects to kafka and reads from the passed in topics. Parses each message into an avro object
+ * and dumps it to the console.
+ */
+public final class DumpToStringListener {
+ private static final Log LOG = LogFactory.getLog(DumpToStringListener.class);
+
+ private DumpToStringListener(){
+
+ }
+
+ public static void main(String[] args) {
+ LOG.info("***** STARTING service '" + DumpToStringListener.class.getSimpleName() + "' *****");
+ VersionInfo.logVersion();
+
+ Options options = new Options();
+ options.addOption("k", "kafkabrokers", true, "Kafka Brokers " +
+ "(comma delimited)");
+ options.addOption("t", "kafkatopics", true, "Kaf" +
+ "ka Topics to subscribe to (comma delimited)");
+ CommandLine commandLine = null;
+ try {
+ commandLine = new DefaultParser().parse(options, args);
+ } catch (ParseException e) {
+ LOG.error("Could not parse: ", e);
+ printUsageAndExit(options, -1);
+ }
+ SpecificDatumReader dreader =
+ new SpecificDatumReader<>(HbaseKafkaEvent.SCHEMA$);
+
+ String topic = commandLine.getOptionValue('t');
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", commandLine.getOptionValue('k'));
+ props.put("group.id", "hbase kafka test tool");
+ props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+ props.put("value.deserializer", ByteArrayDeserializer.class.getName());
+ final KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Arrays.stream(topic.split(",")).collect(Collectors.toList()));
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ consumer.close();
+ }
+ });
+
+ while (true) {
+ ConsumerRecords records = consumer.poll(10000);
+ Iterator> it = records.iterator();
+ while (it.hasNext()) {
+ ConsumerRecord record = it.next();
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
+ try {
+ HbaseKafkaEvent event = dreader.read(null, decoder);
+ LOG.info("key :" + Bytes.toString(record.key()) + " value " + event);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private static void printUsageAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("hbase " + DumpToStringListener.class.getName(), "", options,
+ "\n[--kafkabrokers ] " +
+ "[-k ] \n", true);
+ System.exit(exitCode);
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxyServer.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxyServer.java
new file mode 100755
index 0000000000..95b37b448d
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxyServer.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+/**
+ * hbase to kafka bridge.
+ *
+ * Starts up a region server and receives replication events, just like a peer
+ * cluster member. It takes the events and cell by cell determines how to
+ * route them (see kafka-route-rules.xml)
+ */
+public class KafkaProxyServer extends Configured implements Tool {
+ public static final char INTERNAL_HYPHEN_REPLACEMENT = '\u1400';
+
+ private final Log LOG = LogFactory.getLog(getClass());
+
+ public static void main(String[] args) {
+ try {
+ ToolRunner.run(new KafkaProxyServer(), args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void printUsageAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("hbase kafkaproxy start", "", options,
+ "\nTo run the kafka proxy as a daemon, execute " +
+ "hbase-daemon.sh start|stop kafkaproxy " +
+ "[--kafkabrokers ] " +
+ "[-k ] " +
+ "[--peername name of hbase peer to use (defaults to hbasekafka)] " +
+ "[-p name of hbase peer to use (defaults to hbasekafka)] " +
+ "[--auto auto create peer] " +
+ "[-a auto create peer] \n", true);
+ System.exit(exitCode);
+ }
+
+ @Override
+ public int run(String[] args) {
+ LOG.info("***** STARTING service '" + KafkaProxyServer.class.getSimpleName() + "' *****");
+ VersionInfo.logVersion();
+
+ Options options = new Options();
+
+ options.addOption("k", "kafkabrokers", true,
+ "Kafka Brokers (comma delimited)");
+ options.addOption("p", "peername", true,
+ "Name of hbase peer");
+ options.addOption("a", "autopeer", false,
+ "Create a peer auotmatically to the hbase cluster");
+ options.addOption("f", "kafkaproperties", false,
+ "Path to properties file that has the kafka connection properties");
+
+ Option o = new Option("r", "routerulesfile", true, "file that has routing rules");
+ o.setRequired(true);
+ options.addOption(o);
+
+ CommandLine commandLine = null;
+ try {
+ commandLine = new DefaultParser().parse(options, args);
+ } catch (ParseException e) {
+ LOG.error("Could not parse: ", e);
+ printUsageAndExit(options, -1);
+ }
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+ String zookeeperQ = conf.get("hbase.zookeeper.quorum") + ":" +
+ conf.get("hbase.zookeeper.property.clientPort");
+ String kafkaServers = commandLine.getOptionValue('k');
+
+ String routeRulesFile = commandLine.getOptionValue('r');
+ TopicRoutingRules rrules = new TopicRoutingRules();
+ try (FileInputStream fin = new FileInputStream(routeRulesFile);){
+ rrules.parseRules(fin);
+ } catch (Exception e) {
+ LOG.error("Rule file " + routeRulesFile + " not found or invalid");
+ System.exit(-1);
+ }
+
+ if ((commandLine.getOptionValue('f')==null)&&(commandLine.getOptionValue('k')==null)) {
+ System.err.println("You must provide a list of kafka brokers or a properties " +
+ "file with the connection properties");
+ System.exit(-1);
+ }
+
+ String peerName = commandLine.getOptionValue('p');
+ if (peerName == null) {
+ peerName = "hbasekafka";
+ }
+
+ boolean createPeer = commandLine.hasOption('a');
+
+ LOG.info("using peer named " + peerName + " automatically create? " + createPeer);
+
+ peerName = toInternalSubscriptionName(peerName);
+
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20);
+
+ try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);
+ ) {
+
+ zk.start();
+
+ final String internalName = toInternalSubscriptionName(peerName);
+
+ String basePath = "/hbasekafkaproxy";
+ // always gives the same uuid for the same name
+ UUID uuid = UUID.nameUUIDFromBytes(Bytes.toBytes(internalName));
+
+ if (zk.checkExists().forPath(basePath + "/hbaseid") == null) {
+ zk.create().creatingParentsIfNeeded().forPath(basePath +
+ "/hbaseid", Bytes.toBytes(uuid.toString()));
+ }
+
+ if (zk.checkExists().forPath(basePath + "/rs") == null) {
+ zk.create().forPath(basePath + "/rs");
+ }
+
+ // look for and connect to the peer
+ checkForOrCreateReplicationPeer(conf, zk, basePath, internalName, createPeer);
+
+ Properties configProperties = new Properties();
+
+ if (commandLine.getOptionValue('f') != null){
+ try (FileInputStream fs = new java.io.FileInputStream(
+ new File(commandLine.getOptionValue('f')))){
+ configProperties.load(fs);
+ }
+ } else {
+ configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
+ }
+
+ configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ Producer producer = new KafkaProducer<>(configProperties);
+
+ ProxyHRegionServer proxy = new ProxyHRegionServer(zk, conf, peerName, producer, rrules);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ LOG.info("***** STOPPING service '" +
+ KafkaProxyServer.class.getSimpleName() + "' *****");
+ try (
+ // disable peer if we added it with the -a flag
+ Connection con = ConnectionFactory.createConnection(conf);
+ Admin admin = con.getAdmin();) {
+ LOG.info("disable peer " + internalName + " since it was sdded with the -a flag");
+ admin.disableReplicationPeer(internalName);
+ } catch (Exception e) {
+ LOG.error("unable to pause peer " + internalName);
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ proxy.stop();
+ zk.close();
+ }
+ });
+
+ proxy.start();
+
+ while (true) {
+ try {
+ Thread.sleep(60000);
+ } catch (Exception e) {
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return 0;
+
+ }
+
+ /**
+ * Poll for the configured peer or create it if it does not exist
+ * (controlled by createIfMissing)
+ * @param hbaseConf the hbase configuratoin
+ * @param zk CuratorFramework object
+ * @param basePath base znode.
+ * @param internalName id if the peer to check for/create.
+ * @param createIfMissing if the peer doesn't exist, create it and peer to it.
+ */
+ public void checkForOrCreateReplicationPeer(Configuration hbaseConf,
+ CuratorFramework zk, String basePath,
+ String internalName, boolean createIfMissing) {
+ try (Connection hcon = ConnectionFactory.createConnection(hbaseConf);
+ Admin admin = hcon.getAdmin()) {
+
+ boolean peerThere = false;
+ boolean peerEnabled = true;
+
+ while (!peerThere) {
+ LOG.info("looking for peer " + internalName + " add_peer using " + basePath + " as znode");
+ for (ReplicationPeerDescription desc : admin.listReplicationPeers()) {
+ if (StringUtils.equals(desc.getPeerId(), internalName)) {
+
+ LOG.info("Found peer " + ReflectionToStringBuilder.reflectionToString(
+ admin.getReplicationPeerConfig(internalName)));
+
+ peerEnabled = desc.isEnabled();
+
+ peerThere = true;
+
+ if (peerEnabled) {
+ break;
+ } else {
+ LOG.info("Found peer " + internalName + " but it is disabled, " +
+ "enable the peer to allow the proxy to receive events");
+ }
+ }
+ }
+
+ if ((createIfMissing) && (peerThere)) {
+ // see if the peer is disabled
+ if (!peerEnabled) {
+ LOG.info("enable peer," + internalName + " autocreate is set");
+ admin.enableReplicationPeer(internalName);
+ peerEnabled = true;
+ }
+ } else if ((createIfMissing) && (!peerThere)) {
+ ReplicationPeerConfig rconf = new ReplicationPeerConfig();
+ // get the current cluster's ZK config
+ String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") +
+ ":" + hbaseConf.get("hbase.zookeeper.property.clientPort");
+ String znodePath = zookeeperQ + ":/hbasekafkaproxy";
+ rconf.setClusterKey(znodePath);
+ admin.addReplicationPeer(internalName, rconf);
+ peerThere = true;
+ peerEnabled = true;
+ }
+
+ if ((peerThere) && (peerEnabled)) {
+ break;
+ }
+ Thread.sleep(5000);
+ }
+
+ LOG.info("found replication peer " + internalName);
+
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static String toInternalSubscriptionName(String subscriptionName) {
+ if (subscriptionName.indexOf(INTERNAL_HYPHEN_REPLACEMENT, 0) != -1) {
+ throw new IllegalArgumentException("Subscription name cannot contain character \\U1400");
+ }
+ return subscriptionName.replace('-', INTERNAL_HYPHEN_REPLACEMENT);
+ }
+
+
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/ProxyHRegionServer.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/ProxyHRegionServer.java
new file mode 100755
index 0000000000..21ffc69bdf
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/ProxyHRegionServer.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.zookeeper.CreateMode;
+
+/**
+ * Stub region server that receives the replication events and if they pass the routing rules
+ * forwards them to the configured kafka topic
+ */
+public class ProxyHRegionServer
+ implements AdminProtos.AdminService.BlockingInterface, Server, PriorityFunction {
+
+ private CuratorFramework zk;
+ private Configuration hbaseConf;
+ private RpcServer rpcServer;
+ private String zkNodePath;
+ private String peerName;
+ private ServerName serverName;
+ private boolean running = true;
+ private Log log = LogFactory.getLog(getClass());
+ private Producer producer;
+ private DatumWriter avroWriter =
+ new SpecificDatumWriter(HbaseKafkaEvent.getClassSchema());
+ private TopicRoutingRules routingRules;
+
+ public ProxyHRegionServer() {
+
+ }
+
+ public ProxyHRegionServer(CuratorFramework zk, Configuration hbaseConf, String peerName,
+ Producer producer, TopicRoutingRules routeRules) {
+ this.zk = zk;
+ this.hbaseConf = hbaseConf;
+ this.peerName = peerName;
+ this.producer = producer;
+ this.routingRules = routeRules;
+ }
+
+ /**
+ * start the proxy region server
+ * @throws Exception the proxy could not start
+ */
+ public void start() throws Exception {
+ try {
+ String hostName = Strings.domainNamePointerToHostName(
+ DNS.getDefaultHost(hbaseConf.get("hbase.regionserver.dns.interface", "default"),
+ hbaseConf.get("hbase.regionserver.dns.nameserver", "default")));
+
+ this.producer = producer;
+
+ log.info("listening on host is " + hostName);
+
+ InetSocketAddress initialIsa = new InetSocketAddress(hostName, 0);
+ if (initialIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+ }
+ String name = "regionserver/" + initialIsa.toString();
+ this.rpcServer = new SimpleRpcServer(this, name, getServices(), initialIsa, hbaseConf,
+ new FifoRpcScheduler(hbaseConf,
+ hbaseConf.getInt("hbase.regionserver.handler.count", 10)));
+
+ this.serverName = ServerName.valueOf(hostName, rpcServer.getListenerAddress().getPort(),
+ System.currentTimeMillis());
+
+ rpcServer.start();
+ // Publish our existence in ZooKeeper
+
+ if (zk.checkExists().forPath("/hbasekafkaproxy/rs") == null) {
+ zk.create().forPath("/hbasekafkaproxy/rs");
+ }
+
+ zkNodePath = "/hbasekafkaproxy/rs/" + serverName.getServerName();
+ zk.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkNodePath);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ this.running = true;
+ }
+
+ /**
+ * stop the proxy server
+ */
+ public void stop() {
+ try {
+ rpcServer.stop();
+ producer.flush();
+ producer.close();
+ zk.delete().deletingChildrenIfNeeded().forPath(zkNodePath);
+ } catch (Exception e) {
+ log.error("exception deleting znode " + zkNodePath, e);
+ }
+
+ }
+
+ /**
+ * Handle the replication events that are coming in.
+ * @param controller contains the cells for the mutations
+ * @param request contains the WALLEntry objects
+ * @return ReplicateWALEntryResponse replicateWALEntry on success
+ * @throws ServiceException if entries could not be processed
+ */
+ @Override
+ public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+
+ try {
+ List entries = request.getEntryList();
+ CellScanner cells = ((HBaseRpcController) controller).cellScanner();
+
+ int i = 0;
+
+ for (final AdminProtos.WALEntry entry : entries) {
+ int count = entry.getAssociatedCellCount();
+
+ for (int y = 0; y < count; y++) {
+ if (!cells.advance()) {
+ throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+ }
+
+ TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
+ Cell cell = cells.current();
+ byte[] columnFamily = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+
+ List topics = null;
+
+ if (!routingRules.isExclude(table, columnFamily, qualifier)) {
+ topics = routingRules.getTopics(table, columnFamily, qualifier);
+ }
+
+ if (!CollectionUtils.isEmpty(topics)) {
+ byte[] key = CellUtil.cloneRow(cell);
+ HbaseKafkaEvent event = new HbaseKafkaEvent();
+ event.setKey(ByteBuffer.wrap(key));
+ event.setDelete(CellUtil.isDelete(cell));
+ event.setQualifier(ByteBuffer.wrap(qualifier));
+ event.setFamily(ByteBuffer.wrap(columnFamily));
+ event.setTable(ByteBuffer.wrap(entry.getKey().getTableName().toByteArray()));
+ event.setValue(ByteBuffer.wrap(CellUtil.cloneValue(cell)));
+ event.setTimestamp(cell.getTimestamp());
+ pushKafkaMessage(event, key, topics);
+ }
+ }
+ i++;
+ }
+
+ return AdminProtos.ReplicateWALEntryResponse.newBuilder().build();
+ } catch (Exception ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ /**
+ * push the message to the topic(s)
+ * @param message message to publish
+ * @param key key for kafka topic
+ * @param topics topics to publish to
+ * @throws Exception if message oculd not be sent
+ */
+ public void pushKafkaMessage(HbaseKafkaEvent message, byte[] key, List topics)
+ throws Exception {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bout, null);
+ avroWriter.write(message, encoder);
+ encoder.flush();
+
+ byte[] value = bout.toByteArray();
+
+ for (String topic : topics) {
+ ProducerRecord record = new ProducerRecord(topic, key, value);
+ producer.send(record);
+ }
+ }
+
+ /**
+ * Returns wrapper needed to use RPC service
+ * @return Wrapper needed to use RPC service
+ */
+ private List getServices() {
+ List bssi =
+ new ArrayList(1);
+
+ bssi.add(new RpcServer.BlockingServiceAndInterface(
+ (BlockingService) AdminProtos.AdminService.newReflectiveBlockingService(this),
+ AdminProtos.AdminService.BlockingInterface.class));
+ return bssi;
+ }
+
+ /**
+ * set the routing rules (used for unit tests)
+ * @param routingRules routing tules to use
+ */
+ public void setRoutingRules(TopicRoutingRules routingRules) {
+ this.routingRules = routingRules;
+ }
+
+ /**
+ * set the kafka producer (used for unit tests)
+ * @param producer producer to use
+ */
+ public void setProducer(Producer producer) {
+ this.producer = producer;
+ }
+
+ @Override
+ public AdminProtos.ClearCompactionQueuesResponse clearCompactionQueues(RpcController arg0,
+ ClearCompactionQueuesRequest arg1) throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+
+ }
+
+ @Override
+ public QuotaProtos.GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
+ QuotaProtos.GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public AdminProtos.ExecuteProceduresResponse executeProcedures(RpcController controller,
+ AdminProtos.ExecuteProceduresRequest request) throws ServiceException {
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+
+ throw new IllegalStateException("not implemented!");
+
+ }
+
+ @Override
+ public boolean isAborted() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public void stop(String why) {
+ throw new IllegalStateException("not implemented!");
+
+ }
+
+ @Override
+ public boolean isStopped() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public int getPriority(RequestHeader header, Message param, User user) {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public long getDeadline(RequestHeader header, Message param) {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public Connection getConnection() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public ClusterConnection getClusterConnection() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public MetaTableLocator getMetaTableLocator() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public ServerName getServerName() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public CoordinatedStateManager getCoordinatedStateManager() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+ GetOnlineRegionRequest request) throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public CompactRegionResponse compactRegion(RpcController controller, CompactRegionRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public ReplicateWALEntryResponse replay(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+ UpdateFavoredNodesRequest request) throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+ UpdateConfigurationRequest request) throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
+ throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+ @Override
+ public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
+ ClearRegionBlockCacheRequest request) throws ServiceException {
+
+ throw new IllegalStateException("not implemented!");
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
new file mode 100755
index 0000000000..6513730604
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Implements the matching logic for a rule
+ */
+public abstract class Rule {
+ TableName tableName;
+ private byte [] columnFamily;
+ private byte [] qualifier;
+
+ boolean qualifierStartsWith = false;
+ boolean qualifierEndsWith = false;
+
+ byte []ast = Bytes.toBytes("*");
+
+ /**
+ * Indicates if the table,column family, and qualifier match the rule
+ * @param tryTable table name to test
+ * @param tryColumFamily column family to test
+ * @param tryQualifier qualifier to test
+ * @return true if values match the rule
+ */
+ public boolean match(TableName tryTable, byte [] tryColumFamily, byte [] tryQualifier) {
+ boolean tableMatch = tableMatch(tryTable);
+ boolean columnFamilyMatch = columnFamilyMatch(tryColumFamily);
+ boolean qualfierMatch = qualifierMatch(tryQualifier);
+
+ return tableMatch && columnFamilyMatch && qualfierMatch;
+ }
+
+ /**
+ * Test if the qualifier matches
+ * @param tryQualifier qualifier to test
+ * @return true if the qualifier matches
+ */
+ public boolean qualifierMatch(byte [] tryQualifier) {
+
+ if (qualifier != null) {
+ if (qualifierStartsWith && qualifierEndsWith) {
+ return (startsWith(tryQualifier, this.qualifier) || endsWith(tryQualifier, this.qualifier));
+ } else if (qualifierStartsWith) {
+ return startsWith(tryQualifier, this.qualifier);
+ } else if (qualifierEndsWith) {
+ return endsWith(tryQualifier, this.qualifier);
+ } else {
+ return Bytes.equals(this.qualifier, tryQualifier);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Test if the column family matches the rule
+ * @param tryColumFamily column family to test
+ * @return true if the column family matches the rule
+ */
+ public boolean columnFamilyMatch(byte [] tryColumFamily) {
+ if (columnFamily != null) {
+ return Bytes.equals(this.columnFamily, tryColumFamily);
+ }
+ return true;
+ }
+
+ /**
+ * Test if the table matches the table in the rule
+ * @param tryTable table name to test
+ * @return true if the table matches the rule
+ */
+ public boolean tableMatch(TableName tryTable) {
+ if (tableName == null) {
+ return true;
+ }
+ return (tryTable.equals(this.tableName));
+ }
+
+ /**
+ * set the column family for the rule
+ * @param columnFamily column family to set
+ */
+ public void setColumnFamily(byte [] columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+
+ /**
+ * set the qualifier value for the rule
+ * @param qualifier qualifier to set
+ */
+ public void setQualifier(byte []qualifier) {
+ this.qualifier = qualifier;
+ if (startsWith(qualifier, ast)) {
+ qualifierEndsWith = true;
+ this.qualifier = ArrayUtils.subarray(this.qualifier, ast.length, this.qualifier.length);
+ }
+ if (endsWith(qualifier, ast)) {
+ qualifierStartsWith = true;
+ this.qualifier = ArrayUtils.subarray(this.qualifier, 0, this.qualifier.length - ast.length);
+ }
+ if ((qualifierStartsWith) || (qualifierEndsWith)) {
+ if (this.qualifier.length == 0) {
+ this.qualifier = null;
+ }
+ }
+
+ }
+
+ /**
+ * Tests if data starts with startsWith
+ * @param data byte array to test
+ * @param startsWith array that we want to see if data starts with
+ * @return true if data starts with startsWith
+ */
+ public static boolean startsWith(byte [] data, byte [] startsWith) {
+ if (startsWith.length > data.length) {
+ return false;
+ }
+
+ if (startsWith.length == data.length) {
+ return Bytes.equals(data, startsWith);
+ }
+
+ for (int i = 0; i < startsWith.length; i++) {
+ if (startsWith[i] != data[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Tests if data ends with endsWith
+ * @param data byte array to test
+ * @param endsWith array that we want to see if data ends with
+ * @return true if data ends with endsWith
+ */
+ public static boolean endsWith(byte [] data, byte [] endsWith) {
+ if (endsWith.length > data.length) {
+ return false;
+ }
+
+ if (endsWith.length == data.length) {
+ return Bytes.equals(data, endsWith);
+ }
+
+ int endStart = data.length - endsWith.length;
+
+ for (int i = 0; i < endsWith.length; i++) {
+ //if (endsWith[i]!=data[(data.length-1)-(endsWith.length+i)]){
+ if (endsWith[i] != data[endStart + i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * get the table for the rule
+ * @return tablename for ule
+ */
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ /**
+ * set the table for the rule
+ * @param tableName to set
+ */
+ public void setTableName(TableName tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * get the column family for the rule
+ * @return column family
+ */
+ public byte[] getColumnFamily() {
+ return columnFamily;
+ }
+
+ /**
+ * get the qualifier for the rule
+ * @return qualfier
+ */
+ public byte[] getQualifier() {
+ return qualifier;
+ }
+
+
+ /**
+ * indicates if the qualfier is a wildcard like *foo
+ * @return true if rule is like *foo
+ */
+ public boolean isQualifierEndsWith() {
+ return qualifierEndsWith;
+ }
+
+ /**
+ * indicates if the qualfier is a wildcard like foo*
+ * @return true if rule is like foo*
+ */
+ public boolean isQualifierStartsWith() {
+ return qualifierStartsWith;
+
+ }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
new file mode 100755
index 0000000000..69f00192e6
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * The topic routing/drop rules.
+ *
+ * <rules>
+ * <rule .... />
+ *
+ * </rules>
+ *
+ *
+ *
+ * A wildcard can only be at the beginning or at the end (can be at both sides).
+ *
+ * drop rules are always evaluated first.
+ *
+ * drop examples:
+ * <rule action="drop" table="default:MyTable" />
+ * Do not send replication events for table MyTable
+ *
+ * <rule action="drop" table="default:MyTable" columnFamily="data"/>
+ * Do not send replication events for table MyTable's column family data
+ *
+ * <rule action="drop" table="default:MyTable" columnFamily="data" qualfier="dhold:*"/>
+ * Do not send replication events for any qualiifier on table MyTable with column family data
+ *
+ * routeRules examples:
+ *
+ * <rule action="routeRules" table="default:MyTable" topic="mytopic"/>
+ * routeRules all replication events for table default:Mytable to topic mytopic
+ *
+ * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"/>
+ * routeRules all replication events for table default:Mytable column family data to topic mytopic
+ *
+ * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"
+ * qualifier="hold:*"/>
+ * routeRules all replication events for qualifiers that start with hold: for table
+ * default:Mytable column family data to topic mytopic
+ */
+public class TopicRoutingRules {
+
+ private List dropRules = new ArrayList<>();
+ private List routeRules = new ArrayList<>();
+
+ private File sourceFile;
+
+ /**
+ * used for testing
+ */
+ public TopicRoutingRules() {
+
+ }
+
+ /**
+ * construct rule set from file
+ * @param source file that countains the rule set
+ * @throws Exception if load fails
+ */
+ public TopicRoutingRules(File source) throws Exception {
+ this.sourceFile = source;
+ this.reloadIfFile();
+ }
+
+ /**
+ * Reload the ruleset if it was parsed from a file
+ * @throws Exception error loading rule set
+ */
+ public void reloadIfFile() throws Exception {
+ if (this.sourceFile!=null){
+ List dropRulesSave = this.dropRules;
+ List routeRulesSave = this.routeRules;
+
+ try (FileInputStream fin = new FileInputStream(this.sourceFile)) {
+ List dropRulesNew = new ArrayList<>();
+ List routeRulesNew = new ArrayList<>();
+
+ parseRules(fin,dropRulesNew,routeRulesNew);
+
+ this.dropRules = dropRulesNew;
+ this.routeRules = routeRulesNew;
+
+ } catch (Exception e){
+ // roll back
+ this.dropRules=dropRulesSave;
+ this.routeRules=routeRulesSave;
+ // re-throw
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * parse rules manually from an input stream
+ * @param input InputStream that contains rule text
+ */
+ public void parseRules(InputStream input) {
+ List dropRulesNew = new ArrayList<>();
+ List routeRulesNew = new ArrayList<>();
+ parseRules(input,dropRulesNew,routeRulesNew);
+ this.dropRules = dropRulesNew;
+ this.routeRules = routeRulesNew;
+ }
+
+ /**
+ * Parse the XML in the InputStream into route/drop rules and store them in the passed in Lists
+ * @param input inputstream the contains the ruleset
+ * @param dropRules list to accumulate drop rules
+ * @param routeRules list to accumulate route rules
+ */
+ public void parseRules(InputStream input,List dropRules, List routeRules) {
+ try {
+ DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
+ Document doc = dBuilder.parse(input);
+ NodeList nodList = doc.getElementsByTagName("rule");
+ for (int i = 0; i < nodList.getLength(); i++) {
+ if (nodList.item(i) instanceof Element) {
+ parseRule((Element) nodList.item(i),dropRules,routeRules);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Parse a individual rule from a Element.
+ * @param n the element
+ * @param dropRules list to accumulate drop rules
+ * @param routeRules list to accumulate route rules
+ */
+ public void parseRule(Element n, List dropRules, List routeRules) {
+ Rule r = null;
+ if (n.getAttribute("action").equals("drop")) {
+ r = new DropRule();
+ dropRules.add((DropRule) r);
+ } else {
+ r = new TopicRule(n.getAttribute("topic"));
+ routeRules.add((TopicRule) r);
+ }
+ if (n.hasAttribute("table")) {
+ r.setTableName(TableName.valueOf(n.getAttribute("table")));
+ }
+ if (n.hasAttribute("columnFamily")) {
+ r.setColumnFamily(Bytes.toBytes(n.getAttribute("columnFamily")));
+ }
+ if (n.hasAttribute("qualifier")) {
+ String qual = n.getAttribute("qualifier");
+ r.setQualifier(Bytes.toBytes(qual));
+ }
+ }
+
+ /**
+ * Indicates if a cell mutation should be dropped instead of routed to kafka.
+ * @param table table name to check
+ * @param columnFamily column family to check
+ * @param qualifer qualifier name to check
+ * @return if the mutation should be dropped instead of routed to Kafka
+ */
+ public boolean isExclude(final TableName table, final byte []columnFamily,
+ final byte[] qualifer) {
+ for (DropRule r : getDropRules()) {
+ if (r.match(table, columnFamily, qualifer)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get topics for the table/column family/qualifier combination
+ * @param table table name to check
+ * @param columnFamily column family to check
+ * @param qualifer qualifier name to check
+ * @return list of topics that match the passed in values (or empty for none).
+ */
+ public List getTopics(TableName table, byte []columnFamily, byte []qualifer) {
+ List ret = new ArrayList<>();
+ for (TopicRule r : getRouteRules()) {
+ if (r.match(table, columnFamily, qualifer)) {
+ ret.addAll(r.getTopics());
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * returns all the drop rules (used for testing)
+ * @return drop rules
+ */
+ public List getDropRules() {
+ return dropRules;
+ }
+
+ /**
+ * returns all the route rules (used for testing)
+ * @return route rules
+ */
+ public List getRouteRules() {
+ return routeRules;
+ }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
new file mode 100755
index 0000000000..aba0c1d7ee
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * If the Cell matches the rule returns the configured topics.
+ */
+public class TopicRule extends Rule {
+ private Set topics = new HashSet<>();
+
+ public TopicRule(String topics) {
+ this.topics.addAll(Arrays.stream(topics.split(",")).collect(Collectors.toList()));
+ }
+
+ public Set getTopics() {
+ return topics;
+ }
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/HbaseRpcControllerMock.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/HbaseRpcControllerMock.java
new file mode 100755
index 0000000000..6600c67d56
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/HbaseRpcControllerMock.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+
+/**
+ * Mocks HBaseRpcController so we can pass in our own CellScanner for testing
+ */
+public class HbaseRpcControllerMock implements HBaseRpcController {
+
+ public HbaseRpcControllerMock(final List cellz) {
+ this.scanMe = new CellScanner() {
+ Iterator it = cellz.iterator();
+
+ @Override
+ public Cell current() {
+ return it.next();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return it.hasNext();
+ }
+ };
+ }
+
+ private CellScanner scanMe;
+
+ @Override
+ public CellScanner cellScanner() {
+ return scanMe;
+ }
+
+ @Override
+ public void setCellScanner(CellScanner cellScanner) {
+ this.scanMe = cellScanner;
+ }
+
+ @Override
+ public void setPriority(int priority) {
+
+ }
+
+ @Override
+ public void setPriority(TableName tn) {
+
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public int getCallTimeout() {
+ return 0;
+ }
+
+ @Override
+ public void setCallTimeout(int callTimeout) {
+
+ }
+
+ @Override
+ public boolean hasCallTimeout() {
+ return false;
+ }
+
+ @Override
+ public void setFailed(IOException e) {
+
+ }
+
+ @Override
+ public IOException getFailed() {
+ return null;
+ }
+
+ @Override
+ public void setDone(CellScanner cellScanner) {
+
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback callback) {
+
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback callback, CancellationCallback action)
+ throws IOException {
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public boolean failed() {
+ return false;
+ }
+
+ @Override
+ public String errorText() {
+ return null;
+ }
+
+ @Override
+ public void startCancel() {
+
+ }
+
+ @Override
+ public void setFailed(String reason) {
+
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return false;
+ }
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/ProducerMock.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/ProducerMock.java
new file mode 100755
index 0000000000..b05878bb0b
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/ProducerMock.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.hbase.kafka.HbaseKafkaEvent;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+
+/**
+ * Mocks Kafka producer for testing
+ */
+public class ProducerMock implements Producer {
+ Map> messages = new HashMap<>();
+ SpecificDatumReader dreader = new SpecificDatumReader<>(HbaseKafkaEvent.SCHEMA$);
+
+ public Map> getMessages() {
+ return messages;
+ }
+
+ @Override
+
+ public Future send(ProducerRecord producerRecord) {
+ try {
+
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(producerRecord.value(), null);
+ HbaseKafkaEvent event = dreader.read(null, decoder);
+ if (!messages.containsKey(producerRecord.topic())) {
+ messages.put(producerRecord.topic(), new ArrayList<>());
+ }
+ messages.get(producerRecord.topic()).add(event);
+ return null;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Future send(ProducerRecord producerRecord,
+ Callback callback) {
+ return null;
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public List partitionsFor(String s) {
+ return null;
+ }
+
+ @Override
+ public Map metrics() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void close(long l, TimeUnit timeUnit) {
+
+ }
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestDropRule.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestDropRule.java
new file mode 100755
index 0000000000..60369599a9
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestDropRule.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.kafka.DropRule;
+import org.apache.hadoop.hbase.kafka.TopicRoutingRules;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test different cases of drop rules.
+ */
+
+@Category(SmallTests.class)
+public class TestDropRule {
+ private static final String DROP_RULE1 =
+ " ";
+ private static final String DROP_RULE2 =
+ " ";
+ private static final String DROP_RULE3 =
+ " ";
+
+ private static final String DROP_RULE4 =
+ " ";
+ private static final String DROP_RULE5 =
+ " ";
+
+ private static final String DROP_RULE6 =
+ " ";
+
+ @Test
+ public void testDropies1() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE1.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertEquals(null, rules.getDropRules().get(0).getColumnFamily());
+ Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier());
+ Assert.assertEquals(0, rules.getRouteRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies2() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE2.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier());
+ Assert.assertEquals(0, rules.getRouteRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies3() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE3.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert
+ .assertTrue(Bytes.equals("dhold".getBytes(), rules.getDropRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getRouteRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies4() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE4.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("dhold:".getBytes(), rules.getDropRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getRouteRules().size());
+
+ DropRule drop = rules.getDropRules().get(0);
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dholdme".getBytes()));
+ Assert.assertTrue(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dhold:me".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies5() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE5.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("pickme".getBytes(), rules.getDropRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getRouteRules().size());
+
+ DropRule drop = rules.getDropRules().get(0);
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "blacickme".getBytes()));
+ Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "hithere.pickme".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies6() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE6.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("pickme".getBytes(), rules.getDropRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getRouteRules().size());
+
+ DropRule drop = rules.getDropRules().get(0);
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "blacickme".getBytes()));
+ Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "hithere.pickme".getBytes()));
+ Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "pickme.pleaze.do.it".getBytes()));
+ Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "please.pickme.pleaze".getBytes()));
+ Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "pickme.pleaze.pickme".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestProxyRegionServer.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestProxyRegionServer.java
new file mode 100755
index 0000000000..68e6e320a5
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestProxyRegionServer.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.kafka.ProxyHRegionServer;
+import org.apache.hadoop.hbase.kafka.TopicRoutingRules;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test ProxyHRegionServer
+ */
+@Category(SmallTests.class)
+public class TestProxyRegionServer {
+
+ public AdminProtos.ReplicateWALEntryRequest makeReplicateWALEntryRequest(String table,
+ int cellcount) {
+ WALProtos.WALKey key1 = WALProtos.WALKey.newBuilder()
+ .setTableName(ByteString.copyFrom(table.getBytes())).buildPartial();
+
+ AdminProtos.WALEntry entry = AdminProtos.WALEntry.newBuilder().setAssociatedCellCount(cellcount)
+ .setKey(key1).buildPartial();
+
+ AdminProtos.ReplicateWALEntryRequest req =
+ AdminProtos.ReplicateWALEntryRequest.newBuilder().addEntry(entry).buildPartial();
+ return req;
+ }
+
+ @Test
+ public void testReplicationEventNoRules() {
+ ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer();
+ List cells = new ArrayList<>();
+ KeyValue kv = new KeyValue("row".getBytes(), "family".getBytes(), "qualifier".getBytes(), 1,
+ "value".getBytes());
+ cells.add(kv);
+
+ AdminProtos.ReplicateWALEntryRequest req =
+ makeReplicateWALEntryRequest("default:Mikey", cells.size());
+
+ TopicRoutingRules rules = new TopicRoutingRules();
+ rules.parseRules(new ByteArrayInputStream(" ".getBytes()));
+
+ ProducerMock mockProducer = new ProducerMock();
+ myReplicationProxy.setProducer(mockProducer);
+ myReplicationProxy.setRoutingRules(rules);
+
+ HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells);
+
+ try {
+ myReplicationProxy.replicateWALEntry(rpcMock, req);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ // no rules, so this should be empty.
+ Assert.assertEquals(0, mockProducer.getMessages().size());
+ }
+
+ @Test
+ public void testDropEvent() {
+ ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer();
+ List cells = new ArrayList<>();
+ KeyValue kv = new KeyValue("row1".getBytes(), "family".getBytes(), "qualifier".getBytes(), 1,
+ "value".getBytes());
+ cells.add(kv);
+
+ kv = new KeyValue("row2".getBytes(), "family".getBytes(), "qualifier".getBytes(), 1,
+ "value".getBytes());
+
+ AdminProtos.ReplicateWALEntryRequest req =
+ makeReplicateWALEntryRequest("default:Mikey", cells.size());
+
+ TopicRoutingRules rules = new TopicRoutingRules();
+ rules.parseRules(new ByteArrayInputStream(
+ " ".getBytes()));
+
+ ProducerMock mockProducer = new ProducerMock();
+ myReplicationProxy.setProducer(mockProducer);
+ myReplicationProxy.setRoutingRules(rules);
+
+ HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells);
+
+ try {
+ myReplicationProxy.replicateWALEntry(rpcMock, req);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ // no rules, so this should be empty.
+ Assert.assertEquals(0, mockProducer.getMessages().size());
+ }
+
+ @Test
+ public void testRouteBothEvents() {
+ ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer();
+ List cells = new ArrayList<>();
+ KeyValue kv = new KeyValue("row1".getBytes(), "family1".getBytes(), "qualifier1".getBytes(), 1,
+ "value1".getBytes());
+ cells.add(kv);
+
+ kv = new KeyValue("row2".getBytes(), "family2".getBytes(), "qualifier2".getBytes(), 2,
+ "value2".getBytes());
+ cells.add(kv);
+
+ AdminProtos.ReplicateWALEntryRequest req =
+ makeReplicateWALEntryRequest("default:Mikey", cells.size());
+
+ TopicRoutingRules rules = new TopicRoutingRules();
+ rules.parseRules(new ByteArrayInputStream(
+ " "
+ .getBytes()));
+
+ ProducerMock mockProducer = new ProducerMock();
+ myReplicationProxy.setProducer(mockProducer);
+ myReplicationProxy.setRoutingRules(rules);
+
+ HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells);
+
+ try {
+ myReplicationProxy.replicateWALEntry(rpcMock, req);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ // we routed both to topic foobar
+ Assert.assertEquals(1, mockProducer.getMessages().size());
+ Assert.assertEquals(2, mockProducer.getMessages().get("foobar").size());
+
+ String valueCheck =
+ new String(mockProducer.getMessages().get("foobar").get(0).getKey().array());
+ Assert.assertEquals("row1", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getFamily().array());
+ Assert.assertEquals("family1", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getQualifier().array());
+ Assert.assertEquals("qualifier1", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getValue().array());
+ Assert.assertEquals("value1", valueCheck);
+
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getKey().array());
+ Assert.assertEquals("row2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getFamily().array());
+ Assert.assertEquals("family2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getQualifier().array());
+ Assert.assertEquals("qualifier2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getValue().array());
+ Assert.assertEquals("value2", valueCheck);
+ }
+
+ @Test
+ public void testRoute1Event() {
+ ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer();
+ List cells = new ArrayList<>();
+ KeyValue kv = new KeyValue("row1".getBytes(), "family1".getBytes(), "qualifier1".getBytes(), 1,
+ "value1".getBytes());
+ cells.add(kv);
+
+ /*
+ * public KeyValue(final byte[] row, final byte[] family, final byte[] qualifier, final long
+ * timestamp, final byte[] value) { this(row, family, qualifier, timestamp, Type.Put, value); }
+ */
+
+ kv = new KeyValue("row2".getBytes(), "family2".getBytes(), "qualifier2".getBytes(),
+
+ 2, "value2".getBytes());
+ cells.add(kv);
+
+ AdminProtos.ReplicateWALEntryRequest req =
+ makeReplicateWALEntryRequest("default:Mikey", cells.size());
+
+ TopicRoutingRules rules = new TopicRoutingRules();
+ rules.parseRules(new ByteArrayInputStream(
+ (" ")
+ .getBytes()));
+
+ ProducerMock mockProducer = new ProducerMock();
+ myReplicationProxy.setProducer(mockProducer);
+ myReplicationProxy.setRoutingRules(rules);
+
+ HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells);
+
+ try {
+ myReplicationProxy.replicateWALEntry(rpcMock, req);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ // we routed both to topic foobar
+ Assert.assertEquals(1, mockProducer.getMessages().size());
+ Assert.assertEquals(1, mockProducer.getMessages().get("foobar").size());
+
+ String valueCheck =
+ new String(mockProducer.getMessages().get("foobar").get(0).getKey().array());
+ Assert.assertEquals("row2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getFamily().array());
+ Assert.assertEquals("family2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getQualifier().array());
+ Assert.assertEquals("qualifier2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getValue().array());
+ Assert.assertEquals("value2", valueCheck);
+ }
+
+ @Test
+ public void testRouteBothEvents2() {
+ ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer();
+ List cells = new ArrayList<>();
+ KeyValue kv = new KeyValue("row1".getBytes(), "family1".getBytes(), "qualifier1".getBytes(), 1,
+ "value1".getBytes());
+ cells.add(kv);
+
+ kv = new KeyValue("row2".getBytes(), "family2".getBytes(), "qualifier2".getBytes(), 2,
+ "value2".getBytes());
+ cells.add(kv);
+
+ AdminProtos.ReplicateWALEntryRequest req =
+ makeReplicateWALEntryRequest("default:Mikey", cells.size());
+
+ TopicRoutingRules rules = new TopicRoutingRules();
+ rules.parseRules(new ByteArrayInputStream(
+ (" ")
+ .getBytes()));
+
+ ProducerMock mockProducer = new ProducerMock();
+ myReplicationProxy.setProducer(mockProducer);
+ myReplicationProxy.setRoutingRules(rules);
+
+ HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells);
+
+ try {
+ myReplicationProxy.replicateWALEntry(rpcMock, req);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ // we routed both to topic foobar (using a wildcarded qualifier)
+ Assert.assertEquals(1, mockProducer.getMessages().size());
+ Assert.assertEquals(2, mockProducer.getMessages().get("foobar").size());
+
+ String valueCheck =
+ new String(mockProducer.getMessages().get("foobar").get(0).getKey().array());
+ Assert.assertEquals("row1", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getFamily().array());
+ Assert.assertEquals("family1", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getQualifier().array());
+ Assert.assertEquals("qualifier1", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getValue().array());
+ Assert.assertEquals("value1", valueCheck);
+
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getKey().array());
+ Assert.assertEquals("row2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getFamily().array());
+ Assert.assertEquals("family2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getQualifier().array());
+ Assert.assertEquals("qualifier2", valueCheck);
+ valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getValue().array());
+ Assert.assertEquals("value2", valueCheck);
+ }
+
+}
\ No newline at end of file
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
new file mode 100755
index 0000000000..564baf51b8
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka;
+
+import org.apache.hadoop.hbase.kafka.DropRule;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Make sure match rules work
+ */
+@Category(SmallTests.class)
+public class TestQualifierMatching {
+
+ @Test
+ public void testMatchQualfier() {
+ DropRule rule = new DropRule();
+ rule.setQualifier("data".getBytes());
+ Assert.assertTrue(rule.qualifierMatch("data".getBytes()));
+
+ rule = new DropRule();
+ rule.setQualifier("data1".getBytes());
+ Assert.assertFalse(rule.qualifierMatch("data".getBytes()));
+
+ // if not set, it is a wildcard
+ rule = new DropRule();
+ Assert.assertTrue(rule.qualifierMatch("data".getBytes()));
+ }
+
+ @Test
+ public void testStartWithQualifier() {
+ DropRule rule = new DropRule();
+ rule.setQualifier("data*".getBytes());
+ Assert.assertTrue(rule.isQualifierStartsWith());
+ Assert.assertFalse(rule.isQualifierEndsWith());
+
+ Assert.assertTrue(rule.qualifierMatch("data".getBytes()));
+ Assert.assertTrue(rule.qualifierMatch("data1".getBytes()));
+ Assert.assertTrue(rule.qualifierMatch("datafoobar".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("datfoobar".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("d".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("".getBytes()));
+ }
+
+ @Test
+ public void testEndsWithQualifier() {
+ DropRule rule = new DropRule();
+ rule.setQualifier("*data".getBytes());
+ Assert.assertFalse(rule.isQualifierStartsWith());
+ Assert.assertTrue(rule.isQualifierEndsWith());
+
+ Assert.assertTrue(rule.qualifierMatch("data".getBytes()));
+ Assert.assertTrue(rule.qualifierMatch("1data".getBytes()));
+ Assert.assertTrue(rule.qualifierMatch("foobardata".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("foobardat".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("d".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("".getBytes()));
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestRouteRules.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestRouteRules.java
new file mode 100755
index 0000000000..21ff45a417
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestRouteRules.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.kafka.TopicRoutingRules;
+import org.apache.hadoop.hbase.kafka.TopicRule;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test drop rules
+ */
+@Category(SmallTests.class)
+public class TestRouteRules {
+ private static final String ROUTE_RULE1 =
+ " ";
+ private static final String ROUTE_RULE2 =
+ " ";
+ private static final String ROUTE_RULE3 =
+ " ";
+
+ private static final String ROUTE_RULE4 =
+ " ";
+ private static final String ROUTE_RULE5 =
+ " ";
+
+ private static final String ROUTE_RULE6 =
+ " ";
+
+ @Test
+ public void testTopic1() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertEquals(null, rules.getRouteRules().get(0).getColumnFamily());
+ Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier());
+ Assert.assertEquals(0, rules.getDropRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic2() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE2.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier());
+ Assert.assertEquals(0, rules.getDropRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic3() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE3.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("dhold".getBytes(), rules.getRouteRules().get(0).getQualifier()));
+ Assert.assertTrue(rules.getRouteRules().get(0).getTopics().contains("foo"));
+ Assert.assertEquals(rules.getRouteRules().get(0).getTopics().size(), 1);
+
+ Assert.assertEquals(0, rules.getDropRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic4() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE4.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("dhold:".getBytes(), rules.getRouteRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getDropRules().size());
+
+ TopicRule route = rules.getRouteRules().get(0);
+ Assert.assertFalse(
+ route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(
+ route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dholdme".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "dhold:me".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic5() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE5.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("pickme".getBytes(), rules.getRouteRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getDropRules().size());
+
+ TopicRule route = rules.getRouteRules().get(0);
+ Assert.assertFalse(
+ route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "blacickme".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "hithere.pickme".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic6() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE6.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("pickme".getBytes(), rules.getRouteRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getDropRules().size());
+
+ TopicRule route = rules.getRouteRules().get(0);
+ Assert.assertFalse(
+ route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "blacickme".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "hithere.pickme".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "pickme.pleaze.do.it".getBytes()));
+ Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "please.pickme.pleaze".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "pickme.pleaze.pickme".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index b42308527f..ea7db24bd8 100755
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,9 @@
hbase-metrics
hbase-spark-it
hbase-backup
+ hbase-kafka-model
+ hbase-kafka-proxy
+
@@ -1748,6 +1751,16 @@
org.apache.hbase
${project.version}
| | | | | | |
+