diff --git a/bin/hbase b/bin/hbase index 998bdbeb0c..ad9f2109b0 100755 --- a/bin/hbase +++ b/bin/hbase @@ -96,6 +96,7 @@ if [ $# = 0 ]; then echo " rest Run an HBase REST server" echo " thrift Run the HBase Thrift server" echo " thrift2 Run the HBase Thrift2 server" + echo " kafkaproxy Run the HBase Kafka Proxy server" echo " clean Run the HBase clean up script" echo " classpath Dump hbase CLASSPATH" echo " mapredcp Dump CLASSPATH entries required by mapreduce" @@ -279,7 +280,7 @@ fi unset IFS #Set the right GC options based on the what we are running -declare -a server_cmds=("master" "regionserver" "thrift" "thrift2" "rest" "avro" "zookeeper") +declare -a server_cmds=("master" "regionserver" "thrift" "thrift2" "rest" "avro" "zookeeper" "kafkaproxy") for cmd in ${server_cmds[@]}; do if [[ $cmd == $COMMAND ]]; then server=true @@ -410,6 +411,11 @@ elif [ "$COMMAND" = "thrift2" ] ; then if [ "$1" != "stop" ] ; then HBASE_OPTS="$HBASE_OPTS $HBASE_THRIFT_OPTS" fi +elif [ "$COMMAND" = "kafkaproxy" ] ; then + CLASS='org.apache.hadoop.hbase.kafka.KafkaProxyServer' + if [ "$1" != "stop" ] ; then + HBASE_OPTS="$HBASE_OPTS $HBASE_KAFKA_OPTS" + fi elif [ "$COMMAND" = "rest" ] ; then CLASS='org.apache.hadoop.hbase.rest.RESTServer' if [ "$1" != "stop" ] ; then diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml index 71e90f395b..00149ff3a4 100644 --- a/hbase-assembly/pom.xml +++ b/hbase-assembly/pom.xml @@ -303,6 +303,15 @@ org.apache.hbase hbase-examples + + org.apache.hbase + hbase-kafka-model + + + org.apache.hbase + hbase-kafka-proxy + + diff --git a/hbase-http/pom.xml b/hbase-http/pom.xml index 13fae092f7..bf32232280 100644 --- a/hbase-http/pom.xml +++ b/hbase-http/pom.xml @@ -277,7 +277,10 @@ javax.ws.rs javax.ws.rs-api - + + com.fasterxml.jackson.core + jackson-databind + org.apache.kerby diff --git a/hbase-kafka-model/pom.xml b/hbase-kafka-model/pom.xml new file mode 100755 index 0000000000..2000e57131 --- /dev/null +++ b/hbase-kafka-model/pom.xml @@ -0,0 +1,326 @@ + + + + 4.0.0 + + + hbase-build-configuration + org.apache.hbase + 3.0.0-SNAPSHOT + ../hbase-build-configuration + + + + hbase-kafka-model + Apache HBase - Model Objects for Kafka Proxy + Model objects that can be used consume hbase-kafka-proxy messages + + + + + + org.apache.avro + avro + + + + + + ${project.basedir}/target/java + + + src/main/resources/ + + hbase-default.xml + + + + + + src/test/resources/META-INF/ + META-INF/ + + NOTICE + + true + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/avro/ + ${project.basedir}/target/java/ + + **/*.avro + + + + + + + + + org.apache.maven.plugins + maven-remote-resources-plugin + + + org.apache.maven.plugins + maven-site-plugin + + true + + + + + maven-assembly-plugin + + true + + + + maven-surefire-plugin + + + + listener + org.apache.hadoop.hbase.ResourceCheckerJUnitListener + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + hbase-default.xml + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-antrun-plugin + [${maven.antrun.version}] + + run + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + [2.8,) + + build-classpath + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.2,) + + compile + + + + + + + + + + + + + + + + + + + apache-release + + + + org.apache.maven.plugins + maven-resources-plugin + + + license-javadocs + prepare-package + + copy-resources + + + ${project.build.directory}/apidocs + + + src/main/javadoc/META-INF/ + META-INF/ + + NOTICE + + true + + + + + + + + + + + + skipCommonTests + + + skipCommonTests + + + + true + true + + + + + + + hadoop-2.0 + + + + + !hadoop.profile + + + + + org.apache.hadoop + hadoop-common + + + + + + maven-dependency-plugin + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/mrapp-generated-classpath + + + + + + + + + + + hadoop-3.0 + + + hadoop.profile + 3.0 + + + + 3.0-SNAPSHOT + + + + org.apache.hadoop + hadoop-common + + + + + + maven-dependency-plugin + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/mrapp-generated-classpath + + + + + + + + + diff --git a/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro new file mode 100755 index 0000000000..a5c016deae --- /dev/null +++ b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +{"namespace": "org.apache.hadoop.hbase.kafka", + "type": "record", + "name": "HbaseKafkaEvent", + "fields": [ + {"name": "key", "type": "bytes"}, + {"name": "timestamp", "type": "long" }, + {"name": "delete", "type": "boolean" }, + {"name": "value", "type": "bytes"}, + {"name": "qualifier", "type": "bytes"}, + {"name": "family", "type": "bytes"}, + {"name": "table", "type": "bytes"} + ] +} diff --git a/hbase-kafka-proxy/pom.xml b/hbase-kafka-proxy/pom.xml new file mode 100755 index 0000000000..939576fd2f --- /dev/null +++ b/hbase-kafka-proxy/pom.xml @@ -0,0 +1,329 @@ + + + + 4.0.0 + + hbase-build-configuration + org.apache.hbase + 3.0.0-SNAPSHOT + ../hbase-build-configuration + + + hbase-kafka-proxy + Apache HBase - Kafka Proxy + Proxy that forwards HBase replication events to a Kakfa broker + + + + + org.apache.maven.plugins + maven-remote-resources-plugin + + + org.apache.maven.plugins + maven-site-plugin + + true + + + + + maven-assembly-plugin + + true + + + + maven-surefire-plugin + + + + listener + org.apache.hadoop.hbase.ResourceCheckerJUnitListener + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-antrun-plugin + [${maven.antrun.version}] + + run + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + [2.8,) + + build-classpath + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.2,) + + compile + + + + + + + + + + + + + + + + org.apache.avro + avro + + + org.apache.hbase + hbase-kafka-model + compile + + + org.apache.hbase + hbase-common + + + org.apache.hbase + hbase-client + + + org.apache.hbase + hbase-server + + + org.apache.hbase + hbase-annotations + ${project.version} + + + org.apache.kafka + kafka-clients + 1.0.0 + + + + commons-logging + commons-logging + + + commons-codec + commons-codec + compile + + + org.apache.commons + commons-lang3 + + + org.apache.commons + commons-collections4 + compile + + + commons-io + commons-io + compile + + + com.google.protobuf + protobuf-java + + + org.apache.commons + commons-crypto + + + + + + + apache-release + + + + org.apache.maven.plugins + maven-resources-plugin + + + license-javadocs + prepare-package + + copy-resources + + + ${project.build.directory}/apidocs + + + src/main/javadoc/META-INF/ + META-INF/ + + NOTICE + + true + + + + + + + + + + + + skipCommonTests + + + skipCommonTests + + + + true + true + + + + + + + hadoop-2.0 + + + + + !hadoop.profile + + + + + org.apache.hadoop + hadoop-common + + + + + + maven-dependency-plugin + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/mrapp-generated-classpath + + + + + + + + + + + + hadoop-3.0 + + + hadoop.profile + 3.0 + + + + 3.0-SNAPSHOT + + + + org.apache.hadoop + hadoop-common + + + + + + maven-dependency-plugin + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/mrapp-generated-classpath + + + + + + + + + + diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java new file mode 100755 index 0000000000..6f51431563 --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +/** + * Rule that indicates the Cell should not be replicated + */ +public class DropRule extends Rule { + public DropRule() { + } +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java new file mode 100755 index 0000000000..e42d045a48 --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Properties; +import java.util.stream.Collectors; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + + + +/** + * connects to kafka and reads from the passed in topics. Parses each message into an avro object + * and dumps it to the console. + */ +public final class DumpToStringListener { + private static final Log LOG = LogFactory.getLog(DumpToStringListener.class); + + private DumpToStringListener(){ + + } + + public static void main(String[] args) { + LOG.info("***** STARTING service '" + DumpToStringListener.class.getSimpleName() + "' *****"); + VersionInfo.logVersion(); + + Options options = new Options(); + options.addOption("k", "kafkabrokers", true, "Kafka Brokers " + + "(comma delimited)"); + options.addOption("t", "kafkatopics", true, "Kaf" + + "ka Topics to subscribe to (comma delimited)"); + CommandLine commandLine = null; + try { + commandLine = new DefaultParser().parse(options, args); + } catch (ParseException e) { + LOG.error("Could not parse: ", e); + printUsageAndExit(options, -1); + } + SpecificDatumReader dreader = + new SpecificDatumReader<>(HbaseKafkaEvent.SCHEMA$); + + String topic = commandLine.getOptionValue('t'); + + Properties props = new Properties(); + props.put("bootstrap.servers", commandLine.getOptionValue('k')); + props.put("group.id", "hbase kafka test tool"); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); + final KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Arrays.stream(topic.split(",")).collect(Collectors.toList())); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + consumer.close(); + } + }); + + while (true) { + ConsumerRecords records = consumer.poll(10000); + Iterator> it = records.iterator(); + while (it.hasNext()) { + ConsumerRecord record = it.next(); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null); + try { + HbaseKafkaEvent event = dreader.read(null, decoder); + LOG.info("key :" + Bytes.toString(record.key()) + " value " + event); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + + private static void printUsageAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hbase " + DumpToStringListener.class.getName(), "", options, + "\n[--kafkabrokers ] " + + "[-k ] \n", true); + System.exit(exitCode); + } + +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxyServer.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxyServer.java new file mode 100755 index 0000000000..95b37b448d --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxyServer.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Properties; +import java.util.UUID; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.ReflectionToStringBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; + +/** + * hbase to kafka bridge. + * + * Starts up a region server and receives replication events, just like a peer + * cluster member. It takes the events and cell by cell determines how to + * route them (see kafka-route-rules.xml) + */ +public class KafkaProxyServer extends Configured implements Tool { + public static final char INTERNAL_HYPHEN_REPLACEMENT = '\u1400'; + + private final Log LOG = LogFactory.getLog(getClass()); + + public static void main(String[] args) { + try { + ToolRunner.run(new KafkaProxyServer(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static void printUsageAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hbase kafkaproxy start", "", options, + "\nTo run the kafka proxy as a daemon, execute " + + "hbase-daemon.sh start|stop kafkaproxy " + + "[--kafkabrokers ] " + + "[-k ] " + + "[--peername name of hbase peer to use (defaults to hbasekafka)] " + + "[-p name of hbase peer to use (defaults to hbasekafka)] " + + "[--auto auto create peer] " + + "[-a auto create peer] \n", true); + System.exit(exitCode); + } + + @Override + public int run(String[] args) { + LOG.info("***** STARTING service '" + KafkaProxyServer.class.getSimpleName() + "' *****"); + VersionInfo.logVersion(); + + Options options = new Options(); + + options.addOption("k", "kafkabrokers", true, + "Kafka Brokers (comma delimited)"); + options.addOption("p", "peername", true, + "Name of hbase peer"); + options.addOption("a", "autopeer", false, + "Create a peer auotmatically to the hbase cluster"); + options.addOption("f", "kafkaproperties", false, + "Path to properties file that has the kafka connection properties"); + + Option o = new Option("r", "routerulesfile", true, "file that has routing rules"); + o.setRequired(true); + options.addOption(o); + + CommandLine commandLine = null; + try { + commandLine = new DefaultParser().parse(options, args); + } catch (ParseException e) { + LOG.error("Could not parse: ", e); + printUsageAndExit(options, -1); + } + + Configuration conf = HBaseConfiguration.create(getConf()); + String zookeeperQ = conf.get("hbase.zookeeper.quorum") + ":" + + conf.get("hbase.zookeeper.property.clientPort"); + String kafkaServers = commandLine.getOptionValue('k'); + + String routeRulesFile = commandLine.getOptionValue('r'); + TopicRoutingRules rrules = new TopicRoutingRules(); + try (FileInputStream fin = new FileInputStream(routeRulesFile);){ + rrules.parseRules(fin); + } catch (Exception e) { + LOG.error("Rule file " + routeRulesFile + " not found or invalid"); + System.exit(-1); + } + + if ((commandLine.getOptionValue('f')==null)&&(commandLine.getOptionValue('k')==null)) { + System.err.println("You must provide a list of kafka brokers or a properties " + + "file with the connection properties"); + System.exit(-1); + } + + String peerName = commandLine.getOptionValue('p'); + if (peerName == null) { + peerName = "hbasekafka"; + } + + boolean createPeer = commandLine.hasOption('a'); + + LOG.info("using peer named " + peerName + " automatically create? " + createPeer); + + peerName = toInternalSubscriptionName(peerName); + + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20); + + try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy); + ) { + + zk.start(); + + final String internalName = toInternalSubscriptionName(peerName); + + String basePath = "/hbasekafkaproxy"; + // always gives the same uuid for the same name + UUID uuid = UUID.nameUUIDFromBytes(Bytes.toBytes(internalName)); + + if (zk.checkExists().forPath(basePath + "/hbaseid") == null) { + zk.create().creatingParentsIfNeeded().forPath(basePath + + "/hbaseid", Bytes.toBytes(uuid.toString())); + } + + if (zk.checkExists().forPath(basePath + "/rs") == null) { + zk.create().forPath(basePath + "/rs"); + } + + // look for and connect to the peer + checkForOrCreateReplicationPeer(conf, zk, basePath, internalName, createPeer); + + Properties configProperties = new Properties(); + + if (commandLine.getOptionValue('f') != null){ + try (FileInputStream fs = new java.io.FileInputStream( + new File(commandLine.getOptionValue('f')))){ + configProperties.load(fs); + } + } else { + configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); + } + + configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + Producer producer = new KafkaProducer<>(configProperties); + + ProxyHRegionServer proxy = new ProxyHRegionServer(zk, conf, peerName, producer, rrules); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOG.info("***** STOPPING service '" + + KafkaProxyServer.class.getSimpleName() + "' *****"); + try ( + // disable peer if we added it with the -a flag + Connection con = ConnectionFactory.createConnection(conf); + Admin admin = con.getAdmin();) { + LOG.info("disable peer " + internalName + " since it was sdded with the -a flag"); + admin.disableReplicationPeer(internalName); + } catch (Exception e) { + LOG.error("unable to pause peer " + internalName); + e.printStackTrace(); + System.exit(1); + } + + proxy.stop(); + zk.close(); + } + }); + + proxy.start(); + + while (true) { + try { + Thread.sleep(60000); + } catch (Exception e) { + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + return 0; + + } + + /** + * Poll for the configured peer or create it if it does not exist + * (controlled by createIfMissing) + * @param hbaseConf the hbase configuratoin + * @param zk CuratorFramework object + * @param basePath base znode. + * @param internalName id if the peer to check for/create. + * @param createIfMissing if the peer doesn't exist, create it and peer to it. + */ + public void checkForOrCreateReplicationPeer(Configuration hbaseConf, + CuratorFramework zk, String basePath, + String internalName, boolean createIfMissing) { + try (Connection hcon = ConnectionFactory.createConnection(hbaseConf); + Admin admin = hcon.getAdmin()) { + + boolean peerThere = false; + boolean peerEnabled = true; + + while (!peerThere) { + LOG.info("looking for peer " + internalName + " add_peer using " + basePath + " as znode"); + for (ReplicationPeerDescription desc : admin.listReplicationPeers()) { + if (StringUtils.equals(desc.getPeerId(), internalName)) { + + LOG.info("Found peer " + ReflectionToStringBuilder.reflectionToString( + admin.getReplicationPeerConfig(internalName))); + + peerEnabled = desc.isEnabled(); + + peerThere = true; + + if (peerEnabled) { + break; + } else { + LOG.info("Found peer " + internalName + " but it is disabled, " + + "enable the peer to allow the proxy to receive events"); + } + } + } + + if ((createIfMissing) && (peerThere)) { + // see if the peer is disabled + if (!peerEnabled) { + LOG.info("enable peer," + internalName + " autocreate is set"); + admin.enableReplicationPeer(internalName); + peerEnabled = true; + } + } else if ((createIfMissing) && (!peerThere)) { + ReplicationPeerConfig rconf = new ReplicationPeerConfig(); + // get the current cluster's ZK config + String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") + + ":" + hbaseConf.get("hbase.zookeeper.property.clientPort"); + String znodePath = zookeeperQ + ":/hbasekafkaproxy"; + rconf.setClusterKey(znodePath); + admin.addReplicationPeer(internalName, rconf); + peerThere = true; + peerEnabled = true; + } + + if ((peerThere) && (peerEnabled)) { + break; + } + Thread.sleep(5000); + } + + LOG.info("found replication peer " + internalName); + + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + static String toInternalSubscriptionName(String subscriptionName) { + if (subscriptionName.indexOf(INTERNAL_HYPHEN_REPLACEMENT, 0) != -1) { + throw new IllegalArgumentException("Subscription name cannot contain character \\U1400"); + } + return subscriptionName.replace('-', INTERNAL_HYPHEN_REPLACEMENT); + } + + +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/ProxyHRegionServer.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/ProxyHRegionServer.java new file mode 100755 index 0000000000..21ffc69bdf --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/ProxyHRegionServer.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.SimpleRpcServer; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.util.DNS; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.zookeeper.CreateMode; + +/** + * Stub region server that receives the replication events and if they pass the routing rules + * forwards them to the configured kafka topic + */ +public class ProxyHRegionServer + implements AdminProtos.AdminService.BlockingInterface, Server, PriorityFunction { + + private CuratorFramework zk; + private Configuration hbaseConf; + private RpcServer rpcServer; + private String zkNodePath; + private String peerName; + private ServerName serverName; + private boolean running = true; + private Log log = LogFactory.getLog(getClass()); + private Producer producer; + private DatumWriter avroWriter = + new SpecificDatumWriter(HbaseKafkaEvent.getClassSchema()); + private TopicRoutingRules routingRules; + + public ProxyHRegionServer() { + + } + + public ProxyHRegionServer(CuratorFramework zk, Configuration hbaseConf, String peerName, + Producer producer, TopicRoutingRules routeRules) { + this.zk = zk; + this.hbaseConf = hbaseConf; + this.peerName = peerName; + this.producer = producer; + this.routingRules = routeRules; + } + + /** + * start the proxy region server + * @throws Exception the proxy could not start + */ + public void start() throws Exception { + try { + String hostName = Strings.domainNamePointerToHostName( + DNS.getDefaultHost(hbaseConf.get("hbase.regionserver.dns.interface", "default"), + hbaseConf.get("hbase.regionserver.dns.nameserver", "default"))); + + this.producer = producer; + + log.info("listening on host is " + hostName); + + InetSocketAddress initialIsa = new InetSocketAddress(hostName, 0); + if (initialIsa.getAddress() == null) { + throw new IllegalArgumentException("Failed resolve of " + initialIsa); + } + String name = "regionserver/" + initialIsa.toString(); + this.rpcServer = new SimpleRpcServer(this, name, getServices(), initialIsa, hbaseConf, + new FifoRpcScheduler(hbaseConf, + hbaseConf.getInt("hbase.regionserver.handler.count", 10))); + + this.serverName = ServerName.valueOf(hostName, rpcServer.getListenerAddress().getPort(), + System.currentTimeMillis()); + + rpcServer.start(); + // Publish our existence in ZooKeeper + + if (zk.checkExists().forPath("/hbasekafkaproxy/rs") == null) { + zk.create().forPath("/hbasekafkaproxy/rs"); + } + + zkNodePath = "/hbasekafkaproxy/rs/" + serverName.getServerName(); + zk.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkNodePath); + } catch (Exception e) { + e.printStackTrace(); + throw new IOException(e); + } + this.running = true; + } + + /** + * stop the proxy server + */ + public void stop() { + try { + rpcServer.stop(); + producer.flush(); + producer.close(); + zk.delete().deletingChildrenIfNeeded().forPath(zkNodePath); + } catch (Exception e) { + log.error("exception deleting znode " + zkNodePath, e); + } + + } + + /** + * Handle the replication events that are coming in. + * @param controller contains the cells for the mutations + * @param request contains the WALLEntry objects + * @return ReplicateWALEntryResponse replicateWALEntry on success + * @throws ServiceException if entries could not be processed + */ + @Override + public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + + try { + List entries = request.getEntryList(); + CellScanner cells = ((HBaseRpcController) controller).cellScanner(); + + int i = 0; + + for (final AdminProtos.WALEntry entry : entries) { + int count = entry.getAssociatedCellCount(); + + for (int y = 0; y < count; y++) { + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); + } + + TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); + Cell cell = cells.current(); + byte[] columnFamily = CellUtil.cloneFamily(cell); + byte[] qualifier = CellUtil.cloneQualifier(cell); + + List topics = null; + + if (!routingRules.isExclude(table, columnFamily, qualifier)) { + topics = routingRules.getTopics(table, columnFamily, qualifier); + } + + if (!CollectionUtils.isEmpty(topics)) { + byte[] key = CellUtil.cloneRow(cell); + HbaseKafkaEvent event = new HbaseKafkaEvent(); + event.setKey(ByteBuffer.wrap(key)); + event.setDelete(CellUtil.isDelete(cell)); + event.setQualifier(ByteBuffer.wrap(qualifier)); + event.setFamily(ByteBuffer.wrap(columnFamily)); + event.setTable(ByteBuffer.wrap(entry.getKey().getTableName().toByteArray())); + event.setValue(ByteBuffer.wrap(CellUtil.cloneValue(cell))); + event.setTimestamp(cell.getTimestamp()); + pushKafkaMessage(event, key, topics); + } + } + i++; + } + + return AdminProtos.ReplicateWALEntryResponse.newBuilder().build(); + } catch (Exception ie) { + throw new ServiceException(ie); + } + } + + /** + * push the message to the topic(s) + * @param message message to publish + * @param key key for kafka topic + * @param topics topics to publish to + * @throws Exception if message oculd not be sent + */ + public void pushKafkaMessage(HbaseKafkaEvent message, byte[] key, List topics) + throws Exception { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bout, null); + avroWriter.write(message, encoder); + encoder.flush(); + + byte[] value = bout.toByteArray(); + + for (String topic : topics) { + ProducerRecord record = new ProducerRecord(topic, key, value); + producer.send(record); + } + } + + /** + * Returns wrapper needed to use RPC service + * @return Wrapper needed to use RPC service + */ + private List getServices() { + List bssi = + new ArrayList(1); + + bssi.add(new RpcServer.BlockingServiceAndInterface( + (BlockingService) AdminProtos.AdminService.newReflectiveBlockingService(this), + AdminProtos.AdminService.BlockingInterface.class)); + return bssi; + } + + /** + * set the routing rules (used for unit tests) + * @param routingRules routing tules to use + */ + public void setRoutingRules(TopicRoutingRules routingRules) { + this.routingRules = routingRules; + } + + /** + * set the kafka producer (used for unit tests) + * @param producer producer to use + */ + public void setProducer(Producer producer) { + this.producer = producer; + } + + @Override + public AdminProtos.ClearCompactionQueuesResponse clearCompactionQueues(RpcController arg0, + ClearCompactionQueuesRequest arg1) throws ServiceException { + + throw new IllegalStateException("not implemented!"); + + } + + @Override + public QuotaProtos.GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, + QuotaProtos.GetSpaceQuotaSnapshotsRequest request) throws ServiceException { + throw new IllegalStateException("not implemented!"); + } + + @Override + public AdminProtos.ExecuteProceduresResponse executeProcedures(RpcController controller, + AdminProtos.ExecuteProceduresRequest request) throws ServiceException { + throw new IllegalStateException("not implemented!"); + } + + @Override + public void abort(String why, Throwable e) { + + throw new IllegalStateException("not implemented!"); + + } + + @Override + public boolean isAborted() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public void stop(String why) { + throw new IllegalStateException("not implemented!"); + + } + + @Override + public boolean isStopped() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public int getPriority(RequestHeader header, Message param, User user) { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public long getDeadline(RequestHeader header, Message param) { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public Configuration getConfiguration() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public Connection getConnection() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public ClusterConnection getClusterConnection() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public MetaTableLocator getMetaTableLocator() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public ServerName getServerName() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public ChoreService getChoreService() { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public GetOnlineRegionResponse getOnlineRegion(RpcController controller, + GetOnlineRegionRequest request) throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public CompactRegionResponse compactRegion(RpcController controller, CompactRegionRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public ReplicateWALEntryResponse replay(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public StopServerResponse stopServer(RpcController controller, StopServerRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, + UpdateFavoredNodesRequest request) throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public UpdateConfigurationResponse updateConfiguration(RpcController controller, + UpdateConfigurationRequest request) throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) + throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + + @Override + public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, + ClearRegionBlockCacheRequest request) throws ServiceException { + + throw new IllegalStateException("not implemented!"); + } + +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java new file mode 100755 index 0000000000..6513730604 --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Implements the matching logic for a rule + */ +public abstract class Rule { + TableName tableName; + private byte [] columnFamily; + private byte [] qualifier; + + boolean qualifierStartsWith = false; + boolean qualifierEndsWith = false; + + byte []ast = Bytes.toBytes("*"); + + /** + * Indicates if the table,column family, and qualifier match the rule + * @param tryTable table name to test + * @param tryColumFamily column family to test + * @param tryQualifier qualifier to test + * @return true if values match the rule + */ + public boolean match(TableName tryTable, byte [] tryColumFamily, byte [] tryQualifier) { + boolean tableMatch = tableMatch(tryTable); + boolean columnFamilyMatch = columnFamilyMatch(tryColumFamily); + boolean qualfierMatch = qualifierMatch(tryQualifier); + + return tableMatch && columnFamilyMatch && qualfierMatch; + } + + /** + * Test if the qualifier matches + * @param tryQualifier qualifier to test + * @return true if the qualifier matches + */ + public boolean qualifierMatch(byte [] tryQualifier) { + + if (qualifier != null) { + if (qualifierStartsWith && qualifierEndsWith) { + return (startsWith(tryQualifier, this.qualifier) || endsWith(tryQualifier, this.qualifier)); + } else if (qualifierStartsWith) { + return startsWith(tryQualifier, this.qualifier); + } else if (qualifierEndsWith) { + return endsWith(tryQualifier, this.qualifier); + } else { + return Bytes.equals(this.qualifier, tryQualifier); + } + } + return true; + } + + /** + * Test if the column family matches the rule + * @param tryColumFamily column family to test + * @return true if the column family matches the rule + */ + public boolean columnFamilyMatch(byte [] tryColumFamily) { + if (columnFamily != null) { + return Bytes.equals(this.columnFamily, tryColumFamily); + } + return true; + } + + /** + * Test if the table matches the table in the rule + * @param tryTable table name to test + * @return true if the table matches the rule + */ + public boolean tableMatch(TableName tryTable) { + if (tableName == null) { + return true; + } + return (tryTable.equals(this.tableName)); + } + + /** + * set the column family for the rule + * @param columnFamily column family to set + */ + public void setColumnFamily(byte [] columnFamily) { + this.columnFamily = columnFamily; + } + + /** + * set the qualifier value for the rule + * @param qualifier qualifier to set + */ + public void setQualifier(byte []qualifier) { + this.qualifier = qualifier; + if (startsWith(qualifier, ast)) { + qualifierEndsWith = true; + this.qualifier = ArrayUtils.subarray(this.qualifier, ast.length, this.qualifier.length); + } + if (endsWith(qualifier, ast)) { + qualifierStartsWith = true; + this.qualifier = ArrayUtils.subarray(this.qualifier, 0, this.qualifier.length - ast.length); + } + if ((qualifierStartsWith) || (qualifierEndsWith)) { + if (this.qualifier.length == 0) { + this.qualifier = null; + } + } + + } + + /** + * Tests if data starts with startsWith + * @param data byte array to test + * @param startsWith array that we want to see if data starts with + * @return true if data starts with startsWith + */ + public static boolean startsWith(byte [] data, byte [] startsWith) { + if (startsWith.length > data.length) { + return false; + } + + if (startsWith.length == data.length) { + return Bytes.equals(data, startsWith); + } + + for (int i = 0; i < startsWith.length; i++) { + if (startsWith[i] != data[i]) { + return false; + } + } + return true; + } + + /** + * Tests if data ends with endsWith + * @param data byte array to test + * @param endsWith array that we want to see if data ends with + * @return true if data ends with endsWith + */ + public static boolean endsWith(byte [] data, byte [] endsWith) { + if (endsWith.length > data.length) { + return false; + } + + if (endsWith.length == data.length) { + return Bytes.equals(data, endsWith); + } + + int endStart = data.length - endsWith.length; + + for (int i = 0; i < endsWith.length; i++) { + //if (endsWith[i]!=data[(data.length-1)-(endsWith.length+i)]){ + if (endsWith[i] != data[endStart + i]) { + return false; + } + } + return true; + } + + /** + * get the table for the rule + * @return tablename for ule + */ + public TableName getTableName() { + return tableName; + } + + /** + * set the table for the rule + * @param tableName to set + */ + public void setTableName(TableName tableName) { + this.tableName = tableName; + } + + /** + * get the column family for the rule + * @return column family + */ + public byte[] getColumnFamily() { + return columnFamily; + } + + /** + * get the qualifier for the rule + * @return qualfier + */ + public byte[] getQualifier() { + return qualifier; + } + + + /** + * indicates if the qualfier is a wildcard like *foo + * @return true if rule is like *foo + */ + public boolean isQualifierEndsWith() { + return qualifierEndsWith; + } + + /** + * indicates if the qualfier is a wildcard like foo* + * @return true if rule is like foo* + */ + public boolean isQualifierStartsWith() { + return qualifierStartsWith; + + } +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java new file mode 100755 index 0000000000..69f00192e6 --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +/** + * The topic routing/drop rules. + * + * <rules> + * <rule .... /> + * + * </rules> + * + * + * + * A wildcard can only be at the beginning or at the end (can be at both sides). + * + * drop rules are always evaluated first. + * + * drop examples: + * <rule action="drop" table="default:MyTable" /> + * Do not send replication events for table MyTable + * + * <rule action="drop" table="default:MyTable" columnFamily="data"/> + * Do not send replication events for table MyTable's column family data + * + * <rule action="drop" table="default:MyTable" columnFamily="data" qualfier="dhold:*"/> + * Do not send replication events for any qualiifier on table MyTable with column family data + * + * routeRules examples: + * + * <rule action="routeRules" table="default:MyTable" topic="mytopic"/> + * routeRules all replication events for table default:Mytable to topic mytopic + * + * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"/> + * routeRules all replication events for table default:Mytable column family data to topic mytopic + * + * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic" + * qualifier="hold:*"/> + * routeRules all replication events for qualifiers that start with hold: for table + * default:Mytable column family data to topic mytopic + */ +public class TopicRoutingRules { + + private List dropRules = new ArrayList<>(); + private List routeRules = new ArrayList<>(); + + private File sourceFile; + + /** + * used for testing + */ + public TopicRoutingRules() { + + } + + /** + * construct rule set from file + * @param source file that countains the rule set + * @throws Exception if load fails + */ + public TopicRoutingRules(File source) throws Exception { + this.sourceFile = source; + this.reloadIfFile(); + } + + /** + * Reload the ruleset if it was parsed from a file + * @throws Exception error loading rule set + */ + public void reloadIfFile() throws Exception { + if (this.sourceFile!=null){ + List dropRulesSave = this.dropRules; + List routeRulesSave = this.routeRules; + + try (FileInputStream fin = new FileInputStream(this.sourceFile)) { + List dropRulesNew = new ArrayList<>(); + List routeRulesNew = new ArrayList<>(); + + parseRules(fin,dropRulesNew,routeRulesNew); + + this.dropRules = dropRulesNew; + this.routeRules = routeRulesNew; + + } catch (Exception e){ + // roll back + this.dropRules=dropRulesSave; + this.routeRules=routeRulesSave; + // re-throw + throw e; + } + } + } + + /** + * parse rules manually from an input stream + * @param input InputStream that contains rule text + */ + public void parseRules(InputStream input) { + List dropRulesNew = new ArrayList<>(); + List routeRulesNew = new ArrayList<>(); + parseRules(input,dropRulesNew,routeRulesNew); + this.dropRules = dropRulesNew; + this.routeRules = routeRulesNew; + } + + /** + * Parse the XML in the InputStream into route/drop rules and store them in the passed in Lists + * @param input inputstream the contains the ruleset + * @param dropRules list to accumulate drop rules + * @param routeRules list to accumulate route rules + */ + public void parseRules(InputStream input,List dropRules, List routeRules) { + try { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); + Document doc = dBuilder.parse(input); + NodeList nodList = doc.getElementsByTagName("rule"); + for (int i = 0; i < nodList.getLength(); i++) { + if (nodList.item(i) instanceof Element) { + parseRule((Element) nodList.item(i),dropRules,routeRules); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Parse a individual rule from a Element. + * @param n the element + * @param dropRules list to accumulate drop rules + * @param routeRules list to accumulate route rules + */ + public void parseRule(Element n, List dropRules, List routeRules) { + Rule r = null; + if (n.getAttribute("action").equals("drop")) { + r = new DropRule(); + dropRules.add((DropRule) r); + } else { + r = new TopicRule(n.getAttribute("topic")); + routeRules.add((TopicRule) r); + } + if (n.hasAttribute("table")) { + r.setTableName(TableName.valueOf(n.getAttribute("table"))); + } + if (n.hasAttribute("columnFamily")) { + r.setColumnFamily(Bytes.toBytes(n.getAttribute("columnFamily"))); + } + if (n.hasAttribute("qualifier")) { + String qual = n.getAttribute("qualifier"); + r.setQualifier(Bytes.toBytes(qual)); + } + } + + /** + * Indicates if a cell mutation should be dropped instead of routed to kafka. + * @param table table name to check + * @param columnFamily column family to check + * @param qualifer qualifier name to check + * @return if the mutation should be dropped instead of routed to Kafka + */ + public boolean isExclude(final TableName table, final byte []columnFamily, + final byte[] qualifer) { + for (DropRule r : getDropRules()) { + if (r.match(table, columnFamily, qualifer)) { + return true; + } + } + return false; + } + + /** + * Get topics for the table/column family/qualifier combination + * @param table table name to check + * @param columnFamily column family to check + * @param qualifer qualifier name to check + * @return list of topics that match the passed in values (or empty for none). + */ + public List getTopics(TableName table, byte []columnFamily, byte []qualifer) { + List ret = new ArrayList<>(); + for (TopicRule r : getRouteRules()) { + if (r.match(table, columnFamily, qualifer)) { + ret.addAll(r.getTopics()); + } + } + + return ret; + } + + /** + * returns all the drop rules (used for testing) + * @return drop rules + */ + public List getDropRules() { + return dropRules; + } + + /** + * returns all the route rules (used for testing) + * @return route rules + */ + public List getRouteRules() { + return routeRules; + } +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java new file mode 100755 index 0000000000..aba0c1d7ee --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * If the Cell matches the rule returns the configured topics. + */ +public class TopicRule extends Rule { + private Set topics = new HashSet<>(); + + public TopicRule(String topics) { + this.topics.addAll(Arrays.stream(topics.split(",")).collect(Collectors.toList())); + } + + public Set getTopics() { + return topics; + } +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/HbaseRpcControllerMock.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/HbaseRpcControllerMock.java new file mode 100755 index 0000000000..cb76a1b56c --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/HbaseRpcControllerMock.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; + +/** + * Mocks HBaseRpcController so we can pass in our own CellScanner for testing + */ +public class HbaseRpcControllerMock implements HBaseRpcController { + + public HbaseRpcControllerMock(final List cellz) { + this.scanMe = new CellScanner() { + Iterator it = cellz.iterator(); + + @Override + public Cell current() { + return it.next(); + } + + @Override + public boolean advance() throws IOException { + return it.hasNext(); + } + }; + } + + private CellScanner scanMe; + + @Override + public CellScanner cellScanner() { + return scanMe; + } + + @Override + public void setCellScanner(CellScanner cellScanner) { + this.scanMe = cellScanner; + } + + @Override + public void setPriority(int priority) { + + } + + @Override + public void setPriority(TableName tn) { + + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public int getCallTimeout() { + return 0; + } + + @Override + public void setCallTimeout(int callTimeout) { + + } + + @Override + public boolean hasCallTimeout() { + return false; + } + + @Override + public void setFailed(IOException e) { + + } + + @Override + public IOException getFailed() { + return null; + } + + @Override + public void setDone(CellScanner cellScanner) { + + } + + @Override + public void notifyOnCancel(RpcCallback callback) { + + } + + @Override + public void notifyOnCancel(RpcCallback callback, CancellationCallback action) + throws IOException { + } + + @Override + public void reset() { + + } + + @Override + public boolean failed() { + return false; + } + + @Override + public String errorText() { + return null; + } + + @Override + public void startCancel() { + + } + + @Override + public void setFailed(String reason) { + + } + + @Override + public boolean isCanceled() { + return false; + } +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/ProducerForTesting.java new file mode 100755 index 0000000000..517ec3ea53 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/ProducerForTesting.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.hadoop.hbase.kafka.HbaseKafkaEvent; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; + +/** + * Mocks Kafka producer for testing + */ +public class ProducerForTesting implements Producer { + Map> messages = new HashMap<>(); + SpecificDatumReader dreader = new SpecificDatumReader<>(HbaseKafkaEvent.SCHEMA$); + + public Map> getMessages() { + return messages; + } + + @Override + public void abortTransaction() throws ProducerFencedException { + } + + @Override + public Future send(ProducerRecord producerRecord) { + try { + + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(producerRecord.value(), null); + HbaseKafkaEvent event = dreader.read(null, decoder); + if (!messages.containsKey(producerRecord.topic())) { + messages.put(producerRecord.topic(), new ArrayList<>()); + } + messages.get(producerRecord.topic()).add(event); + return null; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + @Override + public Future send(ProducerRecord producerRecord, + Callback callback) { + return null; + } + + @Override + public void flush() { + } + + @Override + public List partitionsFor(String s) { + return null; + } + + @Override + public Map metrics() { + return null; + } + + @Override + public void close() { + } + + @Override + public void close(long l, TimeUnit timeUnit) { + } + + @Override + public void initTransactions() { + } + + @Override + public void beginTransaction() throws ProducerFencedException { + } + + @Override + public void sendOffsetsToTransaction(Map offsets, + String consumerGroupId) throws ProducerFencedException { + } + + @Override + public void commitTransaction() throws ProducerFencedException { + } +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestDropRule.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestDropRule.java new file mode 100755 index 0000000000..60369599a9 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestDropRule.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka; + +import java.io.ByteArrayInputStream; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.kafka.DropRule; +import org.apache.hadoop.hbase.kafka.TopicRoutingRules; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test different cases of drop rules. + */ + +@Category(SmallTests.class) +public class TestDropRule { + private static final String DROP_RULE1 = + ""; + private static final String DROP_RULE2 = + ""; + private static final String DROP_RULE3 = + ""; + + private static final String DROP_RULE4 = + ""; + private static final String DROP_RULE5 = + ""; + + private static final String DROP_RULE6 = + ""; + + @Test + public void testDropies1() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE1.getBytes())); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertEquals(null, rules.getDropRules().get(0).getColumnFamily()); + Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier()); + Assert.assertEquals(0, rules.getRouteRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies2() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE2.getBytes())); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily())); + Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier()); + Assert.assertEquals(0, rules.getRouteRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies3() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE3.getBytes())); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily())); + Assert + .assertTrue(Bytes.equals("dhold".getBytes(), rules.getDropRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getRouteRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies4() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE4.getBytes())); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("dhold:".getBytes(), rules.getDropRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getRouteRules().size()); + + DropRule drop = rules.getDropRules().get(0); + Assert.assertFalse( + drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes())); + Assert.assertFalse( + drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dholdme".getBytes())); + Assert.assertTrue( + drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dhold:me".getBytes())); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies5() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE5.getBytes())); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("pickme".getBytes(), rules.getDropRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getRouteRules().size()); + + DropRule drop = rules.getDropRules().get(0); + Assert.assertFalse( + drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes())); + Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "blacickme".getBytes())); + Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "hithere.pickme".getBytes())); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies6() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE6.getBytes())); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getDropRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("pickme".getBytes(), rules.getDropRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getRouteRules().size()); + + DropRule drop = rules.getDropRules().get(0); + Assert.assertFalse( + drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes())); + Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "blacickme".getBytes())); + Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "hithere.pickme".getBytes())); + Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "pickme.pleaze.do.it".getBytes())); + Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "please.pickme.pleaze".getBytes())); + Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "pickme.pleaze.pickme".getBytes())); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestProxyRegionServer.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestProxyRegionServer.java new file mode 100755 index 0000000000..fcd0c08a74 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestProxyRegionServer.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.kafka.ProxyHRegionServer; +import org.apache.hadoop.hbase.kafka.TopicRoutingRules; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test ProxyHRegionServer + */ +@Category(SmallTests.class) +public class TestProxyRegionServer { + + public AdminProtos.ReplicateWALEntryRequest makeReplicateWALEntryRequest(String table, + int cellcount) { + WALProtos.WALKey key1 = WALProtos.WALKey.newBuilder() + .setTableName(ByteString.copyFrom(table.getBytes())).buildPartial(); + + AdminProtos.WALEntry entry = AdminProtos.WALEntry.newBuilder().setAssociatedCellCount(cellcount) + .setKey(key1).buildPartial(); + + AdminProtos.ReplicateWALEntryRequest req = + AdminProtos.ReplicateWALEntryRequest.newBuilder().addEntry(entry).buildPartial(); + return req; + } + + @Test + public void testReplicationEventNoRules() { + ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer(); + List cells = new ArrayList<>(); + KeyValue kv = new KeyValue("row".getBytes(), "family".getBytes(), "qualifier".getBytes(), 1, + "value".getBytes()); + cells.add(kv); + + AdminProtos.ReplicateWALEntryRequest req = + makeReplicateWALEntryRequest("default:Mikey", cells.size()); + + TopicRoutingRules rules = new TopicRoutingRules(); + rules.parseRules(new ByteArrayInputStream("".getBytes())); + + ProducerForTesting mockProducer = new ProducerForTesting(); + myReplicationProxy.setProducer(mockProducer); + myReplicationProxy.setRoutingRules(rules); + + HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells); + + try { + myReplicationProxy.replicateWALEntry(rpcMock, req); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + // no rules, so this should be empty. + Assert.assertEquals(0, mockProducer.getMessages().size()); + } + + @Test + public void testDropEvent() { + ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer(); + List cells = new ArrayList<>(); + KeyValue kv = new KeyValue("row1".getBytes(), "family".getBytes(), "qualifier".getBytes(), 1, + "value".getBytes()); + cells.add(kv); + + kv = new KeyValue("row2".getBytes(), "family".getBytes(), "qualifier".getBytes(), 1, + "value".getBytes()); + + AdminProtos.ReplicateWALEntryRequest req = + makeReplicateWALEntryRequest("default:Mikey", cells.size()); + + TopicRoutingRules rules = new TopicRoutingRules(); + rules.parseRules(new ByteArrayInputStream( + "".getBytes())); + + ProducerForTesting mockProducer = new ProducerForTesting(); + myReplicationProxy.setProducer(mockProducer); + myReplicationProxy.setRoutingRules(rules); + + HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells); + + try { + myReplicationProxy.replicateWALEntry(rpcMock, req); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + // no rules, so this should be empty. + Assert.assertEquals(0, mockProducer.getMessages().size()); + } + + @Test + public void testRouteBothEvents() { + ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer(); + List cells = new ArrayList<>(); + KeyValue kv = new KeyValue("row1".getBytes(), "family1".getBytes(), "qualifier1".getBytes(), 1, + "value1".getBytes()); + cells.add(kv); + + kv = new KeyValue("row2".getBytes(), "family2".getBytes(), "qualifier2".getBytes(), 2, + "value2".getBytes()); + cells.add(kv); + + AdminProtos.ReplicateWALEntryRequest req = + makeReplicateWALEntryRequest("default:Mikey", cells.size()); + + TopicRoutingRules rules = new TopicRoutingRules(); + rules.parseRules(new ByteArrayInputStream( + "" + .getBytes())); + + ProducerForTesting mockProducer = new ProducerForTesting(); + myReplicationProxy.setProducer(mockProducer); + myReplicationProxy.setRoutingRules(rules); + + HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells); + + try { + myReplicationProxy.replicateWALEntry(rpcMock, req); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + // we routed both to topic foobar + Assert.assertEquals(1, mockProducer.getMessages().size()); + Assert.assertEquals(2, mockProducer.getMessages().get("foobar").size()); + + String valueCheck = + new String(mockProducer.getMessages().get("foobar").get(0).getKey().array()); + Assert.assertEquals("row1", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getFamily().array()); + Assert.assertEquals("family1", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getQualifier().array()); + Assert.assertEquals("qualifier1", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getValue().array()); + Assert.assertEquals("value1", valueCheck); + + valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getKey().array()); + Assert.assertEquals("row2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getFamily().array()); + Assert.assertEquals("family2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getQualifier().array()); + Assert.assertEquals("qualifier2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getValue().array()); + Assert.assertEquals("value2", valueCheck); + } + + @Test + public void testRoute1Event() { + ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer(); + List cells = new ArrayList<>(); + KeyValue kv = new KeyValue("row1".getBytes(), "family1".getBytes(), "qualifier1".getBytes(), 1, + "value1".getBytes()); + cells.add(kv); + + /* + * public KeyValue(final byte[] row, final byte[] family, final byte[] qualifier, final long + * timestamp, final byte[] value) { this(row, family, qualifier, timestamp, Type.Put, value); } + */ + + kv = new KeyValue("row2".getBytes(), "family2".getBytes(), "qualifier2".getBytes(), + + 2, "value2".getBytes()); + cells.add(kv); + + AdminProtos.ReplicateWALEntryRequest req = + makeReplicateWALEntryRequest("default:Mikey", cells.size()); + + TopicRoutingRules rules = new TopicRoutingRules(); + rules.parseRules(new ByteArrayInputStream( + ("") + .getBytes())); + + ProducerForTesting mockProducer = new ProducerForTesting(); + myReplicationProxy.setProducer(mockProducer); + myReplicationProxy.setRoutingRules(rules); + + HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells); + + try { + myReplicationProxy.replicateWALEntry(rpcMock, req); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + // we routed both to topic foobar + Assert.assertEquals(1, mockProducer.getMessages().size()); + Assert.assertEquals(1, mockProducer.getMessages().get("foobar").size()); + + String valueCheck = + new String(mockProducer.getMessages().get("foobar").get(0).getKey().array()); + Assert.assertEquals("row2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getFamily().array()); + Assert.assertEquals("family2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getQualifier().array()); + Assert.assertEquals("qualifier2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getValue().array()); + Assert.assertEquals("value2", valueCheck); + } + + @Test + public void testRouteBothEvents2() { + ProxyHRegionServer myReplicationProxy = new ProxyHRegionServer(); + List cells = new ArrayList<>(); + KeyValue kv = new KeyValue("row1".getBytes(), "family1".getBytes(), "qualifier1".getBytes(), 1, + "value1".getBytes()); + cells.add(kv); + + kv = new KeyValue("row2".getBytes(), "family2".getBytes(), "qualifier2".getBytes(), 2, + "value2".getBytes()); + cells.add(kv); + + AdminProtos.ReplicateWALEntryRequest req = + makeReplicateWALEntryRequest("default:Mikey", cells.size()); + + TopicRoutingRules rules = new TopicRoutingRules(); + rules.parseRules(new ByteArrayInputStream( + ("") + .getBytes())); + + ProducerForTesting mockProducer = new ProducerForTesting(); + myReplicationProxy.setProducer(mockProducer); + myReplicationProxy.setRoutingRules(rules); + + HBaseRpcController rpcMock = new HbaseRpcControllerMock(cells); + + try { + myReplicationProxy.replicateWALEntry(rpcMock, req); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + // we routed both to topic foobar (using a wildcarded qualifier) + Assert.assertEquals(1, mockProducer.getMessages().size()); + Assert.assertEquals(2, mockProducer.getMessages().get("foobar").size()); + + String valueCheck = + new String(mockProducer.getMessages().get("foobar").get(0).getKey().array()); + Assert.assertEquals("row1", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getFamily().array()); + Assert.assertEquals("family1", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getQualifier().array()); + Assert.assertEquals("qualifier1", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(0).getValue().array()); + Assert.assertEquals("value1", valueCheck); + + valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getKey().array()); + Assert.assertEquals("row2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getFamily().array()); + Assert.assertEquals("family2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getQualifier().array()); + Assert.assertEquals("qualifier2", valueCheck); + valueCheck = new String(mockProducer.getMessages().get("foobar").get(1).getValue().array()); + Assert.assertEquals("value2", valueCheck); + } + +} \ No newline at end of file diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java new file mode 100755 index 0000000000..564baf51b8 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka; + +import org.apache.hadoop.hbase.kafka.DropRule; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Make sure match rules work + */ +@Category(SmallTests.class) +public class TestQualifierMatching { + + @Test + public void testMatchQualfier() { + DropRule rule = new DropRule(); + rule.setQualifier("data".getBytes()); + Assert.assertTrue(rule.qualifierMatch("data".getBytes())); + + rule = new DropRule(); + rule.setQualifier("data1".getBytes()); + Assert.assertFalse(rule.qualifierMatch("data".getBytes())); + + // if not set, it is a wildcard + rule = new DropRule(); + Assert.assertTrue(rule.qualifierMatch("data".getBytes())); + } + + @Test + public void testStartWithQualifier() { + DropRule rule = new DropRule(); + rule.setQualifier("data*".getBytes()); + Assert.assertTrue(rule.isQualifierStartsWith()); + Assert.assertFalse(rule.isQualifierEndsWith()); + + Assert.assertTrue(rule.qualifierMatch("data".getBytes())); + Assert.assertTrue(rule.qualifierMatch("data1".getBytes())); + Assert.assertTrue(rule.qualifierMatch("datafoobar".getBytes())); + Assert.assertFalse(rule.qualifierMatch("datfoobar".getBytes())); + Assert.assertFalse(rule.qualifierMatch("d".getBytes())); + Assert.assertFalse(rule.qualifierMatch("".getBytes())); + } + + @Test + public void testEndsWithQualifier() { + DropRule rule = new DropRule(); + rule.setQualifier("*data".getBytes()); + Assert.assertFalse(rule.isQualifierStartsWith()); + Assert.assertTrue(rule.isQualifierEndsWith()); + + Assert.assertTrue(rule.qualifierMatch("data".getBytes())); + Assert.assertTrue(rule.qualifierMatch("1data".getBytes())); + Assert.assertTrue(rule.qualifierMatch("foobardata".getBytes())); + Assert.assertFalse(rule.qualifierMatch("foobardat".getBytes())); + Assert.assertFalse(rule.qualifierMatch("d".getBytes())); + Assert.assertFalse(rule.qualifierMatch("".getBytes())); + } + +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestRouteRules.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestRouteRules.java new file mode 100755 index 0000000000..21ff45a417 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/org/apache/hadoop/hbase/kafka/TestRouteRules.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.org.apache.hadoop.hbase.kafka; + +import java.io.ByteArrayInputStream; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.kafka.TopicRoutingRules; +import org.apache.hadoop.hbase.kafka.TopicRule; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test drop rules + */ +@Category(SmallTests.class) +public class TestRouteRules { + private static final String ROUTE_RULE1 = + ""; + private static final String ROUTE_RULE2 = + ""; + private static final String ROUTE_RULE3 = + ""; + + private static final String ROUTE_RULE4 = + ""; + private static final String ROUTE_RULE5 = + ""; + + private static final String ROUTE_RULE6 = + ""; + + @Test + public void testTopic1() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes())); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertEquals(null, rules.getRouteRules().get(0).getColumnFamily()); + Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier()); + Assert.assertEquals(0, rules.getDropRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic2() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE2.getBytes())); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier()); + Assert.assertEquals(0, rules.getDropRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic3() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE3.getBytes())); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("dhold".getBytes(), rules.getRouteRules().get(0).getQualifier())); + Assert.assertTrue(rules.getRouteRules().get(0).getTopics().contains("foo")); + Assert.assertEquals(rules.getRouteRules().get(0).getTopics().size(), 1); + + Assert.assertEquals(0, rules.getDropRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic4() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE4.getBytes())); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("dhold:".getBytes(), rules.getRouteRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getDropRules().size()); + + TopicRule route = rules.getRouteRules().get(0); + Assert.assertFalse( + route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes())); + Assert.assertFalse( + route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "dholdme".getBytes())); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "dhold:me".getBytes())); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic5() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE5.getBytes())); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("pickme".getBytes(), rules.getRouteRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getDropRules().size()); + + TopicRule route = rules.getRouteRules().get(0); + Assert.assertFalse( + route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes())); + Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "blacickme".getBytes())); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "hithere.pickme".getBytes())); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic6() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE6.getBytes())); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes(), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("pickme".getBytes(), rules.getRouteRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getDropRules().size()); + + TopicRule route = rules.getRouteRules().get(0); + Assert.assertFalse( + route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), "blah".getBytes())); + Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "blacickme".getBytes())); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "hithere.pickme".getBytes())); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "pickme.pleaze.do.it".getBytes())); + Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "please.pickme.pleaze".getBytes())); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), "data".getBytes(), + "pickme.pleaze.pickme".getBytes())); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + +} diff --git a/pom.xml b/pom.xml index b42308527f..ea7db24bd8 100755 --- a/pom.xml +++ b/pom.xml @@ -92,6 +92,9 @@ hbase-metrics hbase-spark-it hbase-backup + hbase-kafka-model + hbase-kafka-proxy + @@ -1748,6 +1751,16 @@ org.apache.hbase ${project.version} + + hbase-kafka-model + org.apache.hbase + ${project.version} + + + hbase-kafka-proxy + org.apache.hbase + ${project.version} + hbase-metrics-api org.apache.hbase