diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
old mode 100644
new mode 100755
diff --git a/hbase-kafka-model/pom.xml b/hbase-kafka-model/pom.xml
new file mode 100755
index 0000000000..1fa0c31211
--- /dev/null
+++ b/hbase-kafka-model/pom.xml
@@ -0,0 +1,225 @@
+
+
+
+ 4.0.0
+
+
+ hbase-build-configuration
+ org.apache.hbase
+ 3.0.0-SNAPSHOT
+ ../hbase-build-configuration
+
+
+
+ hbase-kafka-model
+ Apache HBase - Model Objects for Kafka Proxy
+ Model objects that represent HBase mutations
+
+
+
+
+
+ org.apache.avro
+ avro
+
+
+
+
+
+ ${project.basedir}/target/java
+
+
+ src/main/resources/
+
+ hbase-default.xml
+
+
+
+
+
+ src/test/resources/META-INF/
+ META-INF/
+
+ NOTICE
+
+ true
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-sources
+
+ schema
+
+
+ ${project.basedir}/src/main/avro/
+ ${project.basedir}/target/java/
+
+ **/*.avro
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-remote-resources-plugin
+
+
+ org.apache.maven.plugins
+ maven-site-plugin
+
+ true
+
+
+
+
+ maven-assembly-plugin
+
+ true
+
+
+
+ maven-surefire-plugin
+
+
+
+ listener
+ org.apache.hadoop.hbase.ResourceCheckerJUnitListener
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+ hbase-default.xml
+
+
+
+
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ [${maven.antrun.version}]
+
+ run
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ [2.8,)
+
+ build-classpath
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ [3.2,)
+
+ compile
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ apache-release
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+
+
+ license-javadocs
+ prepare-package
+
+ copy-resources
+
+
+ ${project.build.directory}/apidocs
+
+
+ src/main/javadoc/META-INF/
+ META-INF/
+
+ NOTICE
+
+ true
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro
new file mode 100755
index 0000000000..ec88627441
--- /dev/null
+++ b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+{"namespace": "org.apache.hadoop.hbase.kafka",
+ "type": "record",
+ "name": "HBaseKafkaEvent",
+ "fields": [
+ {"name": "key", "type": "bytes"},
+ {"name": "timestamp", "type": "long" },
+ {"name": "delete", "type": "boolean" },
+ {"name": "value", "type": "bytes"},
+ {"name": "qualifier", "type": "bytes"},
+ {"name": "family", "type": "bytes"},
+ {"name": "table", "type": "bytes"}
+ ]
+}
diff --git a/hbase-kafka-proxy/pom.xml b/hbase-kafka-proxy/pom.xml
new file mode 100755
index 0000000000..fae8a8123a
--- /dev/null
+++ b/hbase-kafka-proxy/pom.xml
@@ -0,0 +1,239 @@
+
+
+
+ 4.0.0
+
+ hbase-build-configuration
+ org.apache.hbase
+ 3.0.0-SNAPSHOT
+ ../hbase-build-configuration
+
+
+ hbase-kafka-proxy
+ Apache HBase - Kafka Proxy
+ Proxy that forwards HBase replication events to a Kakfa broker
+
+
+
+
+ org.apache.maven.plugins
+ maven-remote-resources-plugin
+
+
+ org.apache.maven.plugins
+ maven-site-plugin
+
+ true
+
+
+
+
+ maven-assembly-plugin
+
+ true
+
+
+
+ maven-surefire-plugin
+
+
+
+ listener
+ org.apache.hadoop.hbase.ResourceCheckerJUnitListener
+
+
+
+
+
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ [${maven.antrun.version}]
+
+ run
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ [2.8,)
+
+ build-classpath
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ [3.2,)
+
+ compile
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.avro
+ avro
+
+
+ org.apache.hbase
+ hbase-kafka-model
+ compile
+
+
+ org.apache.hbase
+ hbase-common
+
+
+ org.apache.hbase
+ hbase-client
+
+
+ org.apache.hbase
+ hbase-zookeeper
+
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.hbase
+ hbase-annotations
+ ${project.version}
+
+
+ org.apache.kafka
+ kafka-clients
+ 1.0.0
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.commons
+ commons-collections4
+ ${collections.version}
+ compile
+
+
+ commons-io
+ commons-io
+ compile
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ org.apache.commons
+ commons-crypto
+
+
+
+
+
+
+ apache-release
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+
+
+ license-javadocs
+ prepare-package
+
+ copy-resources
+
+
+ ${project.build.directory}/apidocs
+
+
+ src/main/javadoc/META-INF/
+ META-INF/
+
+ NOTICE
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+ skipKafkaProxyTests
+
+
+ skipKafkaProxyTests
+
+
+
+ true
+ true
+
+
+
+
+
+
+
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
new file mode 100755
index 0000000000..8bc1effed1
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Rule that indicates the Cell should not be replicated
+ */
+@InterfaceAudience.Private
+public class DropRule extends Rule {
+ public DropRule() {
+ }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
new file mode 100755
index 0000000000..5874f35cea
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
+
+
+/**
+ * connects to kafka and reads from the passed in topics. Parses each message into an avro object
+ * and dumps it to the console.
+ */
+@InterfaceAudience.Private
+public final class DumpToStringListener {
+ private static final Logger LOG = LoggerFactory.getLogger(DumpToStringListener.class);
+
+ private DumpToStringListener(){
+
+ }
+
+ public static void main(String[] args) {
+ LOG.info("***** STARTING service '" + DumpToStringListener.class.getSimpleName() + "' *****");
+ VersionInfo.logVersion();
+
+ Options options = new Options();
+ options.addOption("k", "kafkabrokers", true, "Kafka Brokers " +
+ "(comma delimited)");
+ options.addOption("t", "kafkatopics", true,"Kafka Topics "
+ + "to subscribe to (comma delimited)");
+ CommandLine commandLine = null;
+ try {
+ commandLine = new BasicParser().parse(options, args);
+ } catch (ParseException e) {
+ LOG.error("Could not parse: ", e);
+ printUsageAndExit(options, -1);
+ }
+ SpecificDatumReader dreader =
+ new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
+
+ String topic = commandLine.getOptionValue('t');
+ Properties props = new Properties();
+ props.put("bootstrap.servers", commandLine.getOptionValue('k'));
+ props.put("group.id", "hbase kafka test tool");
+ props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+ props.put("value.deserializer", ByteArrayDeserializer.class.getName());
+
+ try (KafkaConsumer consumer = new KafkaConsumer<>(props);){
+ consumer.subscribe(Arrays.stream(topic.split(",")).collect(Collectors.toList()));
+
+ while (true) {
+ ConsumerRecords records = consumer.poll(10000);
+ Iterator> it = records.iterator();
+ while (it.hasNext()) {
+ ConsumerRecord record = it.next();
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
+ try {
+ HBaseKafkaEvent event = dreader.read(null, decoder);
+ LOG.debug("key :" + Bytes.toString(record.key()) + " value " + event);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ }
+
+ private static void printUsageAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("hbase " + DumpToStringListener.class.getName(), "", options,
+ "\n[--kafkabrokers ] " +
+ "[-k ] \n", true);
+ System.exit(exitCode);
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/HbaseKafkaProxy.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/HbaseKafkaProxy.java
new file mode 100755
index 0000000000..ba83edc07e
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/HbaseKafkaProxy.java
@@ -0,0 +1,308 @@
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@InterfaceAudience.Private
+
+/**
+ * hbase to kafka bridge.
+ *
+ * Starts up a region server and receives replication events, just like a peer
+ * cluster member. It takes the events and cell by cell determines how to
+ * route them (see kafka-route-rules.xml)
+ */
+public class HbaseKafkaProxy {
+ private static final Logger LOG = LoggerFactory.getLogger(HbaseKafkaProxy.class);
+
+ public static final String KAFKA_PROXY_RULES_FILE = "kafkaproxy.rule.file";
+ public static final String KAFKA_PROXY_BROKERS = "kafkaproxy.kafka.brokers";
+ public static final String KAFKA_PROXY_KAFKA_PROPERTIES = "kafkaproxy.kafka.properties";
+ public static final String KAFKA_PROXY_KAFKA_BROKERS = "kafkaproxy.kafka.brokers";
+
+
+
+ private static Map DEFAULT_PROPERTIES = new HashMap<>();
+ static {
+ DEFAULT_PROPERTIES.put("hbase.cluster.distributed","true");
+ DEFAULT_PROPERTIES.put("zookeeper.znode.parent","/kafkaproxy");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.port","17020");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.info.port","17010");
+ DEFAULT_PROPERTIES.put("hbase.client.connection.impl","org.apache.hadoop.hbase.kafka.KafkaBridgeConnection");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.admin.service","false");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.client.service","false");
+ DEFAULT_PROPERTIES.put("hbase.wal.provider","org.apache.hadoop.hbase.wal.DisabledWALProvider");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.workers","false");
+ DEFAULT_PROPERTIES.put("hfile.block.cache.size","0.0001");
+ DEFAULT_PROPERTIES.put("hbase.mob.file.cache.size","0");
+ DEFAULT_PROPERTIES.put("hbase.masterless","true");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.metahandler.count","1");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.replication.handler.count","1");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.handler.count","1");
+ DEFAULT_PROPERTIES.put("hbase.ipc.server.read.threadpool.size","3");
+ }
+
+ private static void printUsageAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("hbase kafkaproxy start", "", options,
+ "\nTo run the kafka proxy as a daemon, execute " +
+ "hbase-daemon.sh start|stop kafkaproxy " +
+ "[--kafkabrokers ] " +
+ "[-b ] " +
+ "[--routerulesfile ] " +
+ "[-r ] " +
+ "[--kafkaproperties ] " +
+ "[-f ] " +
+ "[--peername name of hbase peer to use (defaults to hbasekafka)] " +
+ "[-p name of hbase peer to use (defaults to hbasekafka)] " +
+ "[--znode root znode (defaults to /kafkaproxy)] " +
+ "[-z root znode (defaults to /kafkaproxy)] " +
+
+ "[--enablepeer enable peer on startup (defaults to false)] " +
+ "[-e enable peer on startup (defaults to false)] " +
+
+ "[--auto auto create peer] " +
+ "[-a auto create peer] \n", true);
+ System.exit(exitCode);
+ }
+
+ /**
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String args[]) throws Exception {
+
+ Map otherProps = new HashMap<>();
+
+ Options options = new Options();
+
+ options.addOption("b", "kafkabrokers", true,
+ "Kafka Brokers (comma delimited)");
+ options.addOption("r", "routerulesfile", true,
+ "file that has routing rules (defaults to conf/kafka-route-rules.xml");
+ options.addOption("f", "kafkaproperties", true,
+ "Path to properties file that has the kafka connection properties");
+ options.addOption("p", "peername", true,
+ "Name of hbase peer");
+ options.addOption("z", "znode", true,
+ "root zode to use in zookeeper (defaults to /kafkaproxy)");
+ options.addOption("a", "autopeer", false,
+ "Create a peer automatically to the hbase cluster");
+ options.addOption("e", "enablepeer", false,
+ "enable peer on startup (defaults to false)");
+
+
+ LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
+ VersionInfo.logVersion();
+
+ Configuration conf = HBaseConfiguration.create();
+ CommandLine commandLine = null;
+ try {
+ commandLine = new BasicParser().parse(options, args);
+ } catch (ParseException e) {
+ LOG.error("Could not parse: ", e);
+ printUsageAndExit(options, -1);
+ }
+
+ String peer="";
+ if (!commandLine.hasOption('p')){
+ System.err.println("hbase peer id is required");
+ System.exit(-1);
+ } else {
+ peer = commandLine.getOptionValue('p');
+ }
+
+ boolean createPeer = false;
+ boolean enablePeer = false;
+
+ if (commandLine.hasOption('a')){
+ createPeer=true;
+ }
+
+ if (commandLine.hasOption("a")){
+ enablePeer=true;
+ }
+
+ String rulesFile = StringUtils.defaultIfBlank(commandLine.getOptionValue("r"),"kafka-route-rules.xml");
+
+ if (!new File(rulesFile).exists()){
+ if (HbaseKafkaProxy.class.getClassLoader().getResource(rulesFile)!=null){
+ rulesFile = HbaseKafkaProxy.class.getClassLoader().getResource(rulesFile).getFile();
+ } else {
+ System.err.println("Rules file " + rulesFile +
+ " is invalid");
+ System.exit(-1);
+ }
+ }
+
+ otherProps.put(HbaseKafkaProxy.KAFKA_PROXY_RULES_FILE,rulesFile);
+
+ if (commandLine.hasOption('f')){
+ otherProps.put(HbaseKafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,commandLine.getOptionValue('f'));
+ } else if (commandLine.hasOption('b')){
+ otherProps.put(HbaseKafkaProxy.KAFKA_PROXY_KAFKA_BROKERS,commandLine.getOptionValue('b'));
+ } else {
+ System.err.println("Kafka connection properites or brokers must be specified");
+ System.exit(-1);
+ }
+
+ String zookeeperQ = conf.get("hbase.zookeeper.quorum") + ":" +
+ conf.get("hbase.zookeeper.property.clientPort");
+
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20);
+
+ try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);
+ ) {
+ zk.start();
+ String rootZnode = "/kafkaproxy";
+ setupZookeeperZnodes(zk,rootZnode);
+ checkForOrCreateReplicationPeer(conf,zk,rootZnode,peer,createPeer,enablePeer);
+ }
+
+ @SuppressWarnings("unchecked")
+ Class extends HRegionServer> regionServerClass = (Class extends HRegionServer>) conf
+ .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+
+ List addArgs = DEFAULT_PROPERTIES.keySet().stream()
+ .map((argKey)->("-D"+argKey+"="+ DEFAULT_PROPERTIES.get(argKey)))
+ .collect(Collectors.toList());
+
+ otherProps.keySet().stream()
+ .map((argKey)->("-D"+argKey+"="+ otherProps.get(argKey)))
+ .forEach((item)->addArgs.add(item));
+
+ Arrays.stream(args)
+ .filter((arg)->(arg.startsWith("-D")||arg.equalsIgnoreCase("start")))
+ .forEach((arg)->addArgs.add(arg));
+
+ LOG.info("Args passed to region server "+addArgs);
+
+ String newArgs[]=new String[addArgs.size()];
+ addArgs.toArray(newArgs);
+
+ new HRegionServerCommandLine(regionServerClass).doMain(newArgs);
+ }
+
+
+ /**
+ * Set up the needed znodes under the rootZnode
+ * @param zk CuratorFramework framework instance
+ * @param rootZnode Root znode
+ * @throws Exception If an error occurs
+ */
+ public static void setupZookeeperZnodes(CuratorFramework zk, String rootZnode) throws Exception {
+ // always gives the same uuid for the same name
+ UUID uuid = UUID.nameUUIDFromBytes(Bytes.toBytes(rootZnode));
+ byte []uuidBytes = Bytes.toBytes(uuid.toString());
+ String idPath=rootZnode+"/hbaseid";
+ if (zk.checkExists().forPath(idPath) == null) {
+ zk.create().creatingParentsIfNeeded().forPath(rootZnode +
+ "/hbaseid",uuidBytes);
+ } else {
+ // If the znode is there already make sure it has the
+ // expected value for the peer name.
+ if (!Bytes.equals(zk.getData().forPath(idPath).clone(),uuidBytes)){
+ LOG.warn("znode "+idPath+" has unexpected value "
+ + " (did the peer name for the proxy change?) "
+ + "Updating value");
+ zk.setData().forPath(idPath, uuidBytes);
+ }
+ }
+ }
+
+ /**
+ * Poll for the configured peer or create it if it does not exist
+ * (controlled by createIfMissing)
+ * @param hbaseConf the hbase configuratoin
+ * @param zk CuratorFramework object
+ * @param basePath base znode.
+ * @param peerName id if the peer to check for/create.
+ * @param createIfMissing if the peer doesn't exist, create it and peer to it.
+ */
+ public static void checkForOrCreateReplicationPeer(Configuration hbaseConf,
+ CuratorFramework zk,
+ String basePath,
+ String peerName, boolean createIfMissing,
+ boolean enablePeer) {
+ try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
+ Admin admin = conn.getAdmin()) {
+
+ boolean peerThere = false;
+
+ while (!peerThere) {
+ try {
+ ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(peerName);
+ if (peerConfig !=null) {
+ peerThere=true;
+ }
+ } catch (ReplicationPeerNotFoundException e) {
+ if (createIfMissing) {
+ ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
+ // get the current cluster's ZK config
+ String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") +
+ ":" +
+ hbaseConf.get("hbase.zookeeper.property.clientPort");
+ String znodePath = zookeeperQ + ":"+basePath;
+ ReplicationPeerConfig rconf = builder.setClusterKey(znodePath).build();
+ admin.addReplicationPeer(peerName, rconf);
+ peerThere = true;
+ }
+ }
+
+ if (peerThere) {
+ if (enablePeer){
+ LOG.info("enable peer," + peerName);
+ admin.enableReplicationPeer(peerName);
+ }
+ break;
+ } else {
+ LOG.info("peer "+peerName+" not found, service will not completely start until the peer exists");
+ }
+ Thread.sleep(5000);
+ }
+
+ LOG.info("found replication peer " + peerName);
+
+ } catch (Exception e) {
+ LOG.error("Exception running proxy ",e);
+ }
+ }
+}
+
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java
new file mode 100755
index 0000000000..ae91da5616
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java
@@ -0,0 +1,183 @@
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableBuilder;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+
+/**
+ * a alternative implementation of a connection object that forwards the mutations to a kafka queue
+ * depending on the routing rules (see kafka-route-rules.xml).
+ * */
+public class KafkaBridgeConnection implements Connection {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBridgeConnection.class);
+
+ private final Configuration conf;
+ private final User user;
+ private final ExecutorService pool;
+ private volatile boolean closed = false;
+ private TopicRoutingRules routingRules;
+ private Producer producer;
+ private DatumWriter avroWriter =
+ new SpecificDatumWriter<>(HBaseKafkaEvent.getClassSchema());
+
+
+ public KafkaBridgeConnection(Configuration conf, ExecutorService pool, User user) throws IOException {
+ this.conf = conf;
+ this.user = user;
+ this.pool = pool;
+ setupRules();
+ startKafkaConnection();
+ }
+
+ /**
+ * for testing.
+ * @param conf hbase configuration
+ * @param pool executor service
+ * @param user user with connection
+ * @param routingRules a set of routing rules
+ * @param producer a kafka producer
+ * @throws IOException
+ */
+ public KafkaBridgeConnection(Configuration conf, ExecutorService pool, User user,
+ TopicRoutingRules routingRules,Producer producer) throws IOException {
+ this.conf = conf;
+ this.user = user;
+ this.pool = pool;
+ this.producer=producer;
+ this.routingRules=routingRules;
+ }
+
+ private void setupRules() throws IOException {
+ String file = this.conf.get(HbaseKafkaProxy.KAFKA_PROXY_RULES_FILE);
+ routingRules = new TopicRoutingRules();
+ try (FileInputStream fin = new FileInputStream(file);){
+ routingRules.parseRules(fin);
+ }
+ }
+
+ private void startKafkaConnection() throws IOException {
+ Properties configProperties = new Properties();
+
+ String kafkaPropsFile = conf.get(HbaseKafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,"");
+ if (!StringUtils.isEmpty(kafkaPropsFile)){
+ try (FileInputStream fs = new java.io.FileInputStream(
+ new File(kafkaPropsFile))){
+ configProperties.load(fs);
+ }
+ } else {
+ String kafkaServers =conf.get(HbaseKafkaProxy.KAFKA_PROXY_KAFKA_BROKERS);
+ configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
+ }
+
+ configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ this.producer = new KafkaProducer(configProperties);
+ }
+
+
+
+ @Override
+ public void abort(String why, Throwable e) {}
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
+ return null;
+ }
+
+ @Override
+ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Admin getAdmin() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!this.closed) {
+ this.closed = true;
+ this.producer.close();
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
+
+ @Override
+ public TableBuilder getTableBuilder(final TableName tn, ExecutorService pool) {
+ if (isClosed()) {
+ throw new RuntimeException("KafkaBridgeConnection is closed.");
+ }
+ final Configuration passedInConfiguration = getConfiguration();
+ return new TableBuilder() {
+ @Override
+ public TableBuilder setOperationTimeout(int timeout) {
+ return null;
+ }
+
+ @Override
+ public TableBuilder setRpcTimeout(int timeout) {
+ return null;
+ }
+
+ @Override
+ public TableBuilder setReadRpcTimeout(int timeout) {
+ return null;
+ }
+
+ @Override
+ public TableBuilder setWriteRpcTimeout(int timeout) {
+ return null;
+ }
+
+ @Override
+ public Table build() {
+ return new KafkaTableForBridge(tn,passedInConfiguration,routingRules,producer,avroWriter) ;
+ }
+ };
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
new file mode 100755
index 0000000000..fcee7323d2
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
@@ -0,0 +1,401 @@
+package org.apache.hadoop.hbase.kafka;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class KafkaTableForBridge implements Table {
+ private Logger LOG = LoggerFactory.getLogger(KafkaTableForBridge.class);
+
+ private final Configuration conf;
+ private final TableName tableName;
+ private byte[] tableAsBytes;
+
+ private Producer producer;
+ private TopicRoutingRules routingRules;
+
+ private DatumWriter avroWriter;
+
+ private static final class CheckMutation {
+ byte[]qualifier;
+ byte[]family;
+ Cell cell;
+ List topics = new ArrayList<>();
+ }
+
+ public KafkaTableForBridge(TableName tableName,
+ Configuration conf,
+ TopicRoutingRules routingRules,
+ Producer producer,
+ DatumWriter avroWriter){
+ this.conf=conf;
+ this.tableName=tableName;
+ this.tableAsBytes=this.tableName.toBytes();
+ this.routingRules=routingRules;
+ this.producer=producer;
+ this.avroWriter=avroWriter;
+ }
+
+ private List> processMutation(CheckMutation check, boolean isDelete){
+ HBaseKafkaEvent event = new HBaseKafkaEvent();
+ event.setKey(ByteBuffer.wrap(check.cell.getRowArray(),check.cell.getRowOffset(),check.cell.getRowLength()));
+ event.setTable(ByteBuffer.wrap(tableAsBytes));
+ event.setDelete(isDelete);
+ event.setTimestamp(check.cell.getTimestamp());
+ event.setFamily(ByteBuffer.wrap(check.family));
+ event.setQualifier(ByteBuffer.wrap(check.qualifier));
+ event.setValue(ByteBuffer.wrap(check.cell.getValueArray(),check.cell.getValueOffset(),check.cell.getValueLength()));
+
+ return check.topics.stream()
+ .map((topic)->new Pair(topic,event))
+ .collect(Collectors.toList());
+ }
+
+ private boolean keep(CheckMutation ret){
+ if (!routingRules.isExclude(this.tableName,ret.family, ret.qualifier)){
+ return true;
+ }
+ return false;
+ }
+
+ private CheckMutation addTopics(CheckMutation ret){
+ ret.topics= routingRules.getTopics(this.tableName,ret.family,ret.qualifier);
+ return ret;
+ }
+
+ private ProducerRecord toByteArray(ByteArrayOutputStream bout, Pair event, BinaryEncoder encoder) {
+ try {
+ bout.reset();
+ BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, encoder);
+ avroWriter.write(event.getSecond(), encoderUse);
+ encoder.flush();
+ return new ProducerRecord(event.getFirst(),event.getSecond().getKey().array(),bout.toByteArray());
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void batch(final List extends Row> actions, Object[] results)
+ throws IOException, InterruptedException {
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, null);
+
+ LOG.debug("got {} inputs ",actions.size());
+
+ actions.stream()
+ .filter((row)->row instanceof Mutation)
+ .map((row)->(Mutation)row)
+ .flatMap((row)->{
+ Mutation mut = (Mutation) row;
+ boolean isDelete = mut instanceof Delete;
+ return mut.getFamilyCellMap().keySet().stream()
+ .flatMap((family)->mut.getFamilyCellMap().get(family).stream())
+ .map((cell)->{
+ CheckMutation ret = new CheckMutation();
+ ret.family=CellUtil.cloneFamily(cell);
+ ret.qualifier=CellUtil.cloneQualifier(cell);
+ ret.cell=cell;
+ return ret;
+ })
+ .filter((check)->keep(check))
+ .map((check)->addTopics(check))
+ .filter((check)->!CollectionUtils.isEmpty(check.topics))
+ .flatMap((check)->processMutation(check,isDelete ).stream());
+ })
+ .map((event)->toByteArray(bout,event,encoderUse))
+ .forEach((item)->producer.send(item));
+
+ this.producer.flush();
+ }
+
+ @Override
+ public long getReadRpcTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public Table.CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
+ return null;
+ }
+
+ @Override
+ public long getOperationTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public long getWriteRpcTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public long getRpcTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public boolean[] exists(List gets) {
+ return new boolean[0];
+ }
+
+ @Override
+ public TableName getName() {
+ return this.tableName;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ @Override
+ public HTableDescriptor getTableDescriptor() throws IOException {
+ return null;
+ }
+
+ @Override
+ public TableDescriptor getDescriptor() throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean exists(Get get) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean[] existsAll(List gets) throws IOException {
+ return new boolean[0];
+ }
+
+ @Override
+ public void batchCallback(List extends Row> actions, Object[] results, Batch.Callback callback) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public Result get(Get get) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Result[] get(List gets) throws IOException {
+ return new Result[0];
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void put(Put put) throws IOException {
+ }
+
+ @Override
+ public void put(List puts) throws IOException {
+
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void delete(Delete delete) throws IOException {
+
+ }
+
+ @Override
+ public void delete(List deletes) throws IOException {
+
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Result increment(Increment increment) throws IOException {
+ return null;
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public CoprocessorRpcChannel coprocessorService(byte[] row) {
+ return null;
+ }
+
+ @Override
+ public Map coprocessorService(Class service, byte[] startKey, byte[] endKey, Batch.Call callable) throws ServiceException, Throwable {
+ return null;
+ }
+
+ @Override
+ public void coprocessorService(Class service, byte[] startKey, byte[] endKey, Batch.Call callable, Batch.Callback callback) throws ServiceException, Throwable {
+
+ }
+
+ @Override
+ public Map batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
+ return null;
+ }
+
+ @Override
+ public void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback callback) throws ServiceException, Throwable {
+
+ }
+
+ @Override
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setOperationTimeout(int operationTimeout) {
+
+ }
+
+ @Override
+ public int getOperationTimeout() {
+ return 0;
+ }
+
+ @Override
+ public int getRpcTimeout() {
+ return 0;
+ }
+
+ @Override
+ public void setRpcTimeout(int rpcTimeout) {
+
+ }
+
+ @Override
+ public int getReadRpcTimeout() {
+ return 0;
+ }
+
+ @Override
+ public void setReadRpcTimeout(int readRpcTimeout) {
+
+ }
+
+ @Override
+ public int getWriteRpcTimeout() {
+ return 0;
+ }
+
+ @Override
+ public void setWriteRpcTimeout(int writeRpcTimeout) {
+
+ }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
new file mode 100755
index 0000000000..7d02025bb1
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+
+/**
+ * Implements the matching logic for a rule
+ */
+@InterfaceAudience.Private
+public abstract class Rule {
+ TableName tableName;
+ private byte [] columnFamily;
+ private byte [] qualifier;
+
+ boolean qualifierStartsWith = false;
+ boolean qualifierEndsWith = false;
+
+ byte []ast = Bytes.toBytes("*");
+
+ /**
+ * Indicates if the table,column family, and qualifier match the rule
+ * @param tryTable table name to test
+ * @param tryColumFamily column family to test
+ * @param tryQualifier qualifier to test
+ * @return true if values match the rule
+ */
+ public boolean match(TableName tryTable, byte [] tryColumFamily, byte [] tryQualifier) {
+ boolean tableMatch = tableMatch(tryTable);
+ boolean columnFamilyMatch = columnFamilyMatch(tryColumFamily);
+ boolean qualfierMatch = qualifierMatch(tryQualifier);
+
+ return tableMatch && columnFamilyMatch && qualfierMatch;
+ }
+
+ /**
+ * Test if the qualifier matches
+ * @param tryQualifier qualifier to test
+ * @return true if the qualifier matches
+ */
+ public boolean qualifierMatch(byte [] tryQualifier) {
+
+ if (qualifier != null) {
+ if (qualifierStartsWith && qualifierEndsWith) {
+ return (startsWith(tryQualifier, this.qualifier) || endsWith(tryQualifier, this.qualifier));
+ } else if (qualifierStartsWith) {
+ return startsWith(tryQualifier, this.qualifier);
+ } else if (qualifierEndsWith) {
+ return endsWith(tryQualifier, this.qualifier);
+ } else {
+ return Bytes.equals(this.qualifier, tryQualifier);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Test if the column family matches the rule
+ * @param tryColumFamily column family to test
+ * @return true if the column family matches the rule
+ */
+ public boolean columnFamilyMatch(byte [] tryColumFamily) {
+ if (columnFamily != null) {
+ return Bytes.equals(this.columnFamily, tryColumFamily);
+ }
+ return true;
+ }
+
+ /**
+ * Test if the table matches the table in the rule
+ * @param tryTable table name to test
+ * @return true if the table matches the rule
+ */
+ public boolean tableMatch(TableName tryTable) {
+ if (tableName == null) {
+ return true;
+ }
+ return (tryTable.equals(this.tableName));
+ }
+
+ /**
+ * set the column family for the rule
+ * @param columnFamily column family to set
+ */
+ public void setColumnFamily(byte [] columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+
+ /**
+ * set the qualifier value for the rule
+ * @param qualifier qualifier to set
+ */
+ public void setQualifier(byte []qualifier) {
+ this.qualifier = qualifier;
+ if (startsWith(qualifier, ast)) {
+ qualifierEndsWith = true;
+ this.qualifier = ArrayUtils.subarray(this.qualifier, ast.length, this.qualifier.length);
+ }
+ if (endsWith(qualifier, ast)) {
+ qualifierStartsWith = true;
+ this.qualifier = ArrayUtils.subarray(this.qualifier, 0, this.qualifier.length - ast.length);
+ }
+ if ((qualifierStartsWith) || (qualifierEndsWith)) {
+ if (this.qualifier.length == 0) {
+ this.qualifier = null;
+ }
+ }
+
+ }
+
+ /**
+ * Tests if data starts with startsWith
+ * @param data byte array to test
+ * @param startsWith array that we want to see if data starts with
+ * @return true if data starts with startsWith
+ */
+ public static boolean startsWith(byte [] data, byte [] startsWith) {
+ if (startsWith.length > data.length) {
+ return false;
+ }
+
+ if (startsWith.length == data.length) {
+ return Bytes.equals(data, startsWith);
+ }
+
+ for (int i = 0; i < startsWith.length; i++) {
+ if (startsWith[i] != data[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Tests if data ends with endsWith
+ * @param data byte array to test
+ * @param endsWith array that we want to see if data ends with
+ * @return true if data ends with endsWith
+ */
+ public static boolean endsWith(byte [] data, byte [] endsWith) {
+ if (endsWith.length > data.length) {
+ return false;
+ }
+
+ if (endsWith.length == data.length) {
+ return Bytes.equals(data, endsWith);
+ }
+
+ int endStart = data.length - endsWith.length;
+
+ for (int i = 0; i < endsWith.length; i++) {
+ //if (endsWith[i]!=data[(data.length-1)-(endsWith.length+i)]){
+ if (endsWith[i] != data[endStart + i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * get the table for the rule
+ * @return tablename for ule
+ */
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ /**
+ * set the table for the rule
+ * @param tableName to set
+ */
+ public void setTableName(TableName tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * get the column family for the rule
+ * @return column family
+ */
+ public byte[] getColumnFamily() {
+ return columnFamily;
+ }
+
+ /**
+ * get the qualifier for the rule
+ * @return qualfier
+ */
+ public byte[] getQualifier() {
+ return qualifier;
+ }
+
+
+ /**
+ * indicates if the qualfier is a wildcard like *foo
+ * @return true if rule is like *foo
+ */
+ public boolean isQualifierEndsWith() {
+ return qualifierEndsWith;
+ }
+
+ /**
+ * indicates if the qualfier is a wildcard like foo*
+ * @return true if rule is like foo*
+ */
+ public boolean isQualifierStartsWith() {
+ return qualifierStartsWith;
+
+ }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
new file mode 100755
index 0000000000..c8b818c64b
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * The topic routing/drop rules.
+ *
+ * <rules>
+ * <rule .... />
+ *
+ * </rules>
+ *
+ *
+ *
+ * A wildcard can only be at the beginning or at the end (can be at both sides).
+ *
+ * drop rules are always evaluated first.
+ *
+ * drop examples:
+ * <rule action="drop" table="default:MyTable" />
+ * Do not send replication events for table MyTable
+ *
+ * <rule action="drop" table="default:MyTable" columnFamily="data"/>
+ * Do not send replication events for table MyTable's column family data
+ *
+ * <rule action="drop" table="default:MyTable" columnFamily="data" qualfier="dhold:*"/>
+ * Do not send replication events for any qualiifier on table MyTable with column family data
+ *
+ * routeRules examples:
+ *
+ * <rule action="routeRules" table="default:MyTable" topic="mytopic"/>
+ * routeRules all replication events for table default:Mytable to topic mytopic
+ *
+ * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"/>
+ * routeRules all replication events for table default:Mytable column family data to topic mytopic
+ *
+ * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"
+ * qualifier="hold:*"/>
+ * routeRules all replication events for qualifiers that start with hold: for table
+ * default:Mytable column family data to topic mytopic
+ */
+@InterfaceAudience.Private
+public class TopicRoutingRules {
+
+ private List dropRules = new ArrayList<>();
+ private List routeRules = new ArrayList<>();
+
+ private File sourceFile;
+
+ /**
+ * used for testing
+ */
+ public TopicRoutingRules() {
+
+ }
+
+ /**
+ * construct rule set from file
+ * @param source file that countains the rule set
+ * @throws Exception if load fails
+ */
+ public TopicRoutingRules(File source) throws Exception {
+ this.sourceFile = source;
+ this.reloadIfFile();
+ }
+
+ /**
+ * Reload the ruleset if it was parsed from a file
+ * @throws Exception error loading rule set
+ */
+ public void reloadIfFile() throws Exception {
+ if (this.sourceFile!=null){
+ List dropRulesSave = this.dropRules;
+ List routeRulesSave = this.routeRules;
+
+ try (FileInputStream fin = new FileInputStream(this.sourceFile)) {
+ List dropRulesNew = new ArrayList<>();
+ List routeRulesNew = new ArrayList<>();
+
+ parseRules(fin,dropRulesNew,routeRulesNew);
+
+ this.dropRules = dropRulesNew;
+ this.routeRules = routeRulesNew;
+
+ } catch (Exception e){
+ // roll back
+ this.dropRules=dropRulesSave;
+ this.routeRules=routeRulesSave;
+ // re-throw
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * parse rules manually from an input stream
+ * @param input InputStream that contains rule text
+ */
+ public void parseRules(InputStream input) {
+ List dropRulesNew = new ArrayList<>();
+ List routeRulesNew = new ArrayList<>();
+ parseRules(input,dropRulesNew,routeRulesNew);
+ this.dropRules = dropRulesNew;
+ this.routeRules = routeRulesNew;
+ }
+
+ /**
+ * Parse the XML in the InputStream into route/drop rules and store them in the passed in Lists
+ * @param input inputstream the contains the ruleset
+ * @param dropRules list to accumulate drop rules
+ * @param routeRules list to accumulate route rules
+ */
+ public void parseRules(InputStream input,List dropRules, List routeRules) {
+ try {
+ DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
+ Document doc = dBuilder.parse(input);
+ NodeList nodList = doc.getElementsByTagName("rule");
+ for (int i = 0; i < nodList.getLength(); i++) {
+ if (nodList.item(i) instanceof Element) {
+ parseRule((Element) nodList.item(i),dropRules,routeRules);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Parse a individual rule from a Element.
+ * @param n the element
+ * @param dropRules list to accumulate drop rules
+ * @param routeRules list to accumulate route rules
+ */
+ public void parseRule(Element n, List dropRules, List routeRules) {
+ Rule r = null;
+ if (n.getAttribute("action").equals("drop")) {
+ r = new DropRule();
+ dropRules.add((DropRule) r);
+ } else {
+ r = new TopicRule(n.getAttribute("topic"));
+ routeRules.add((TopicRule) r);
+ }
+ if (n.hasAttribute("table")) {
+ r.setTableName(TableName.valueOf(n.getAttribute("table")));
+ }
+ if (n.hasAttribute("columnFamily")) {
+ r.setColumnFamily(Bytes.toBytes(n.getAttribute("columnFamily")));
+ }
+ if (n.hasAttribute("qualifier")) {
+ String qual = n.getAttribute("qualifier");
+ r.setQualifier(Bytes.toBytes(qual));
+ }
+ }
+
+ /**
+ * Indicates if a cell mutation should be dropped instead of routed to kafka.
+ * @param table table name to check
+ * @param columnFamily column family to check
+ * @param qualifer qualifier name to check
+ * @return if the mutation should be dropped instead of routed to Kafka
+ */
+ public boolean isExclude(final TableName table, final byte []columnFamily,
+ final byte[] qualifer) {
+ for (DropRule r : getDropRules()) {
+ if (r.match(table, columnFamily, qualifer)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get topics for the table/column family/qualifier combination
+ * @param table table name to check
+ * @param columnFamily column family to check
+ * @param qualifer qualifier name to check
+ * @return list of topics that match the passed in values (or empty for none).
+ */
+ public List getTopics(TableName table, byte []columnFamily, byte []qualifer) {
+ List ret = new ArrayList<>();
+ for (TopicRule r : getRouteRules()) {
+ if (r.match(table, columnFamily, qualifer)) {
+ ret.addAll(r.getTopics());
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * returns all the drop rules (used for testing)
+ * @return drop rules
+ */
+ public List getDropRules() {
+ return dropRules;
+ }
+
+ /**
+ * returns all the route rules (used for testing)
+ * @return route rules
+ */
+ public List getRouteRules() {
+ return routeRules;
+ }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
new file mode 100755
index 0000000000..5e5b6bfd94
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * If the Cell matches the rule returns the configured topics.
+ */
+@InterfaceAudience.Private
+public class TopicRule extends Rule {
+ private Set topics = new HashSet<>();
+
+ public TopicRule(String topics) {
+ this.topics.addAll(Arrays.stream(topics.split(",")).collect(Collectors.toList()));
+ }
+
+ public Set getTopics() {
+ return topics;
+ }
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
new file mode 100755
index 0000000000..4e064684d4
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+
+/**
+ * Mocks Kafka producer for testing
+ */
+public class ProducerForTesting implements Producer {
+ Map> messages = new HashMap<>();
+ SpecificDatumReader dreader = new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
+
+ public Map> getMessages() {
+ return messages;
+ }
+
+ @Override
+ public void abortTransaction() throws ProducerFencedException {
+ }
+
+ @Override
+ public Future send(ProducerRecord producerRecord) {
+ try {
+
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(producerRecord.value(), null);
+ HBaseKafkaEvent event = dreader.read(null, decoder);
+ if (!messages.containsKey(producerRecord.topic())) {
+ messages.put(producerRecord.topic(), new ArrayList<>());
+ }
+ messages.get(producerRecord.topic()).add(event);
+ return new Future() {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws InterruptedException, ExecutionException {
+ return new RecordMetadata(null, 1, 1, 1, 1, 1, 1);
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ };
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Future send(ProducerRecord producerRecord,
+ Callback callback) {
+ return null;
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ @Override
+ public List partitionsFor(String s) {
+ return null;
+ }
+
+ @Override
+ public Map metrics() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void close(long l, TimeUnit timeUnit) {
+ }
+
+ @Override
+ public void initTransactions() {
+ }
+
+ @Override
+ public void beginTransaction() throws ProducerFencedException {
+ }
+
+ @Override
+ public void sendOffsetsToTransaction(Map offsets,
+ String consumerGroupId) throws ProducerFencedException {
+ }
+
+ @Override
+ public void commitTransaction() throws ProducerFencedException {
+ }
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java
new file mode 100755
index 0000000000..0565d041ff
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test different cases of drop rules.
+ */
+
+@Category(SmallTests.class)
+public class TestDropRule {
+ private static final String DROP_RULE1 =
+ "";
+ private static final String DROP_RULE2 =
+ "";
+ private static final String DROP_RULE3 =
+ "";
+
+ private static final String DROP_RULE4 =
+ "";
+ private static final String DROP_RULE5 =
+ "";
+
+ private static final String DROP_RULE6 =
+ "";
+
+ @Test
+ public void testDropies1() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE1.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertEquals(null, rules.getDropRules().get(0).getColumnFamily());
+ Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier());
+ Assert.assertEquals(0, rules.getRouteRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies2() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE2.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier());
+ Assert.assertEquals(0, rules.getRouteRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies3() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE3.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert
+ .assertTrue(Bytes.equals("dhold".getBytes(), rules.getDropRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getRouteRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies4() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE4.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("dhold:".getBytes(), rules.getDropRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getRouteRules().size());
+
+ DropRule drop = rules.getDropRules().get(0);
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dholdme".getBytes()));
+ Assert.assertTrue(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dhold:me".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies5() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE5.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("pickme".getBytes(), rules.getDropRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getRouteRules().size());
+
+ DropRule drop = rules.getDropRules().get(0);
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "blacickme".getBytes()));
+ Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "hithere.pickme".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDropies6() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(DROP_RULE6.getBytes()));
+ Assert.assertEquals(1, rules.getDropRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getDropRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("pickme".getBytes(), rules.getDropRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getRouteRules().size());
+
+ DropRule drop = rules.getDropRules().get(0);
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "blacickme".getBytes()));
+ Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "hithere.pickme".getBytes()));
+ Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "pickme.pleaze.do.it".getBytes()));
+ Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "please.pickme.pleaze".getBytes()));
+ Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "pickme.pleaze.pickme".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java
new file mode 100755
index 0000000000..e00467d1f2
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java
@@ -0,0 +1,90 @@
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+/**
+ * Test that mutations are getting published to the topic
+ */
+@Category(SmallTests.class)
+public class TestProcessMutations {
+ private User user = new User() {
+ @Override
+ public String getShortName() {
+ return "my name";
+ }
+
+ @Override
+ public T runAs(PrivilegedAction action) {
+ return null;
+ }
+
+ @Override
+ public T runAs(PrivilegedExceptionAction action) throws IOException, InterruptedException {
+ return null;
+ }
+ };
+
+ private static final String ROUTE_RULE1 =
+ "";
+
+ ProducerForTesting myTestingProducer;
+
+ @Before
+ public void setup() {
+ this.myTestingProducer=new ProducerForTesting();
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ @Test
+ public void testSendMessage() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+
+ //Configuration conf, ExecutorService pool, User user,
+ // TopicRoutingRules routingRules,Producer producer
+
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes()));
+ Configuration conf = new Configuration();
+ KafkaBridgeConnection connection =
+ new KafkaBridgeConnection(conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer );
+ long zeTimestamp = System.currentTimeMillis();
+ Put put = new Put("key1".getBytes(),zeTimestamp);
+ put.addColumn("FAMILY".getBytes(),"not foo".getBytes(),"VALUE should NOT pass".getBytes());
+ put.addColumn("FAMILY".getBytes(),"foo".getBytes(),"VALUE should pass".getBytes());
+ Table myTable = connection.getTable(TableName.valueOf("MyNamespace:MyTable"));
+ List rows = new ArrayList<>();
+ rows.add(put);
+ myTable.batch(rows,new Object[0]);
+
+ Assert.assertEquals(myTestingProducer.getMessages().isEmpty(),false);
+
+ } catch (Exception e){
+ Assert.fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
new file mode 100755
index 0000000000..3cef4e8dbd
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Make sure match rules work
+ */
+@Category(SmallTests.class)
+public class TestQualifierMatching {
+
+ @Test
+ public void testMatchQualfier() {
+ DropRule rule = new DropRule();
+ rule.setQualifier("data".getBytes());
+ Assert.assertTrue(rule.qualifierMatch("data".getBytes()));
+
+ rule = new DropRule();
+ rule.setQualifier("data1".getBytes());
+ Assert.assertFalse(rule.qualifierMatch("data".getBytes()));
+
+ // if not set, it is a wildcard
+ rule = new DropRule();
+ Assert.assertTrue(rule.qualifierMatch("data".getBytes()));
+ }
+
+ @Test
+ public void testStartWithQualifier() {
+ DropRule rule = new DropRule();
+ rule.setQualifier("data*".getBytes());
+ Assert.assertTrue(rule.isQualifierStartsWith());
+ Assert.assertFalse(rule.isQualifierEndsWith());
+
+ Assert.assertTrue(rule.qualifierMatch("data".getBytes()));
+ Assert.assertTrue(rule.qualifierMatch("data1".getBytes()));
+ Assert.assertTrue(rule.qualifierMatch("datafoobar".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("datfoobar".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("d".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("".getBytes()));
+ }
+
+ @Test
+ public void testEndsWithQualifier() {
+ DropRule rule = new DropRule();
+ rule.setQualifier("*data".getBytes());
+ Assert.assertFalse(rule.isQualifierStartsWith());
+ Assert.assertTrue(rule.isQualifierEndsWith());
+
+ Assert.assertTrue(rule.qualifierMatch("data".getBytes()));
+ Assert.assertTrue(rule.qualifierMatch("1data".getBytes()));
+ Assert.assertTrue(rule.qualifierMatch("foobardata".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("foobardat".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("d".getBytes()));
+ Assert.assertFalse(rule.qualifierMatch("".getBytes()));
+ }
+
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java
new file mode 100755
index 0000000000..813d6dfd6b
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test drop rules
+ */
+@Category(SmallTests.class)
+public class TestRouteRules {
+ private static final String ROUTE_RULE1 =
+ "";
+ private static final String ROUTE_RULE2 =
+ "";
+ private static final String ROUTE_RULE3 =
+ "";
+
+ private static final String ROUTE_RULE4 =
+ "";
+ private static final String ROUTE_RULE5 =
+ "";
+
+ private static final String ROUTE_RULE6 =
+ "";
+
+ @Test
+ public void testTopic1() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertEquals(null, rules.getRouteRules().get(0).getColumnFamily());
+ Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier());
+ Assert.assertEquals(0, rules.getDropRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic2() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE2.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier());
+ Assert.assertEquals(0, rules.getDropRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic3() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE3.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("dhold".getBytes(), rules.getRouteRules().get(0).getQualifier()));
+ Assert.assertTrue(rules.getRouteRules().get(0).getTopics().contains("foo"));
+ Assert.assertEquals(rules.getRouteRules().get(0).getTopics().size(), 1);
+
+ Assert.assertEquals(0, rules.getDropRules().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic4() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE4.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("dhold:".getBytes(), rules.getRouteRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getDropRules().size());
+
+ TopicRule route = rules.getRouteRules().get(0);
+ Assert.assertFalse(
+ route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(
+ route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dholdme".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "dhold:me".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic5() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE5.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("pickme".getBytes(), rules.getRouteRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getDropRules().size());
+
+ TopicRule route = rules.getRouteRules().get(0);
+ Assert.assertFalse(
+ route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "blacickme".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "hithere.pickme".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopic6() {
+ TopicRoutingRules rules = new TopicRoutingRules();
+ try {
+ rules.parseRules(new ByteArrayInputStream(ROUTE_RULE6.getBytes()));
+ Assert.assertEquals(1, rules.getRouteRules().size());
+ Assert.assertEquals(TableName.valueOf("default:MyTable"),
+ rules.getRouteRules().get(0).getTableName());
+ Assert.assertTrue(
+ Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(
+ Bytes.equals("pickme".getBytes(), rules.getRouteRules().get(0).getQualifier()));
+ Assert.assertEquals(0, rules.getDropRules().size());
+
+ TopicRule route = rules.getRouteRules().get(0);
+ Assert.assertFalse(
+ route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes()));
+ Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "blacickme".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "hithere.pickme".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "pickme.pleaze.do.it".getBytes()));
+ Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "please.pickme.pleaze".getBytes()));
+ Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(),
+ "pickme.pleaze.pickme".getBytes()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
old mode 100644
new mode 100755
diff --git a/pom.xml b/pom.xml
index ea02fe8c3c..a4188bcffe 100755
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,8 @@
hbase-spark-it
hbase-backup
hbase-zookeeper
+ hbase-kafka-model
+ hbase-kafka-proxy
@@ -1796,6 +1798,16 @@
org.apache.hbase
${project.version}
+
+ hbase-kafka-model
+ org.apache.hbase
+ ${project.version}
+
+
+ hbase-kafka-proxy
+ org.apache.hbase
+ ${project.version}
+
hbase-metrics-api
org.apache.hbase