diff --git a/bin/hbase b/bin/hbase index 559a02e9f8..d8179075e1 100755 --- a/bin/hbase +++ b/bin/hbase @@ -94,6 +94,7 @@ if [ $# = 0 ]; then echo " shell Run the HBase shell" echo " hbck Run the hbase 'fsck' tool" echo " snapshot Tool for managing snapshots" + if [ "${in_omnibus_tarball}" = "true" ]; then echo " wal Write-ahead-log analyzer" echo " hfile Store file analyzer" @@ -104,8 +105,10 @@ if [ $# = 0 ]; then echo " rest Run an HBase REST server" echo " thrift Run the HBase Thrift server" echo " thrift2 Run the HBase Thrift2 server" + echo " kafkaproxy Run the HBase Kafka Proxy server" echo " clean Run the HBase clean up script" fi + echo " classpath Dump hbase CLASSPATH" echo " mapredcp Dump CLASSPATH entries required by mapreduce" echo " pe Run PerformanceEvaluation" @@ -382,7 +385,7 @@ fi unset IFS #Set the right GC options based on the what we are running -declare -a server_cmds=("master" "regionserver" "thrift" "thrift2" "rest" "avro" "zookeeper") +declare -a server_cmds=("master" "regionserver" "thrift" "thrift2" "rest" "avro" "zookeeper" "kafkaproxy") for cmd in ${server_cmds[@]}; do if [[ $cmd == $COMMAND ]]; then server=true @@ -561,6 +564,11 @@ elif [ "$COMMAND" = "thrift2" ] ; then if [ "$1" != "stop" ] ; then HBASE_OPTS="$HBASE_OPTS $HBASE_THRIFT_OPTS" fi +elif [ "$COMMAND" = "kafkaproxy" ] ; then + CLASS='org.apache.hadoop.hbase.kafka.KafkaProxy' + if [ "$1" != "stop" ] ; then + HBASE_OPTS="$HBASE_OPTS $HBASE_KAFKA_OPTS" + fi elif [ "$COMMAND" = "rest" ] ; then CLASS='org.apache.hadoop.hbase.rest.RESTServer' if [ "$1" != "stop" ] ; then diff --git a/conf/kafka-route-rules.xml b/conf/kafka-route-rules.xml new file mode 100755 index 0000000000..5abab9d2f8 --- /dev/null +++ b/conf/kafka-route-rules.xml @@ -0,0 +1,65 @@ + + + + + diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml index eeeff4a687..7ccc82674f 100644 --- a/hbase-assembly/pom.xml +++ b/hbase-assembly/pom.xml @@ -332,11 +332,19 @@ org.apache.hbase hbase-zookeeper + + + org.apache.hbase + hbase-kafka-model jline jline + + org.apache.hbase + hbase-kafka-proxy + diff --git a/hbase-kafka-model/pom.xml b/hbase-kafka-model/pom.xml new file mode 100755 index 0000000000..1fa0c31211 --- /dev/null +++ b/hbase-kafka-model/pom.xml @@ -0,0 +1,225 @@ + + + + 4.0.0 + + + hbase-build-configuration + org.apache.hbase + 3.0.0-SNAPSHOT + ../hbase-build-configuration + + + + hbase-kafka-model + Apache HBase - Model Objects for Kafka Proxy + Model objects that represent HBase mutations + + + + + + org.apache.avro + avro + + + + + + ${project.basedir}/target/java + + + src/main/resources/ + + hbase-default.xml + + + + + + src/test/resources/META-INF/ + META-INF/ + + NOTICE + + true + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/avro/ + ${project.basedir}/target/java/ + + **/*.avro + + + + + + + + + org.apache.maven.plugins + maven-remote-resources-plugin + + + org.apache.maven.plugins + maven-site-plugin + + true + + + + + maven-assembly-plugin + + true + + + + maven-surefire-plugin + + + + listener + org.apache.hadoop.hbase.ResourceCheckerJUnitListener + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + hbase-default.xml + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-antrun-plugin + [${maven.antrun.version}] + + run + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + [2.8,) + + build-classpath + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.2,) + + compile + + + + + + + + + + + + + + + + + + + apache-release + + + + org.apache.maven.plugins + maven-resources-plugin + + + license-javadocs + prepare-package + + copy-resources + + + ${project.build.directory}/apidocs + + + src/main/javadoc/META-INF/ + META-INF/ + + NOTICE + + true + + + + + + + + + + + diff --git a/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro new file mode 100755 index 0000000000..ec88627441 --- /dev/null +++ b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +{"namespace": "org.apache.hadoop.hbase.kafka", + "type": "record", + "name": "HBaseKafkaEvent", + "fields": [ + {"name": "key", "type": "bytes"}, + {"name": "timestamp", "type": "long" }, + {"name": "delete", "type": "boolean" }, + {"name": "value", "type": "bytes"}, + {"name": "qualifier", "type": "bytes"}, + {"name": "family", "type": "bytes"}, + {"name": "table", "type": "bytes"} + ] +} diff --git a/hbase-kafka-proxy/pom.xml b/hbase-kafka-proxy/pom.xml new file mode 100755 index 0000000000..81e3658679 --- /dev/null +++ b/hbase-kafka-proxy/pom.xml @@ -0,0 +1,236 @@ + + + + 4.0.0 + + hbase-build-configuration + org.apache.hbase + 3.0.0-SNAPSHOT + ../hbase-build-configuration + + + hbase-kafka-proxy + Apache HBase - Kafka Proxy + Proxy that forwards HBase replication events to a Kakfa broker + + + + + org.apache.maven.plugins + maven-remote-resources-plugin + + + org.apache.maven.plugins + maven-site-plugin + + true + + + + + maven-assembly-plugin + + true + + + + maven-surefire-plugin + + + + listener + org.apache.hadoop.hbase.ResourceCheckerJUnitListener + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-antrun-plugin + [${maven.antrun.version}] + + run + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + [2.8,) + + build-classpath + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.2,) + + compile + + + + + + + + + + + + + + + + org.apache.avro + avro + + + org.apache.hbase + hbase-kafka-model + compile + + + org.apache.hbase + hbase-common + + + org.apache.hbase + hbase-common + test-jar + test + + + org.apache.hbase + hbase-client + + + org.apache.hbase + hbase-zookeeper + + + org.apache.hbase + hbase-server + + + org.apache.hbase + hbase-annotations + ${project.version} + + + org.apache.kafka + kafka-clients + 1.0.0 + + + + org.apache.commons + commons-lang3 + + + org.apache.commons + commons-collections4 + ${collections.version} + compile + + + commons-io + commons-io + compile + + + + + + + apache-release + + + + org.apache.maven.plugins + maven-resources-plugin + + + license-javadocs + prepare-package + + copy-resources + + + ${project.build.directory}/apidocs + + + src/main/javadoc/META-INF/ + META-INF/ + + NOTICE + + true + + + + + + + + + + + + skipKafkaProxyTests + + + skipKafkaProxyTests + + + + true + true + + + + + + + diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java new file mode 100755 index 0000000000..8bc1effed1 --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Rule that indicates the Cell should not be replicated + */ +@InterfaceAudience.Private +public class DropRule extends Rule { + public DropRule() { + } +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java new file mode 100755 index 0000000000..0f467eb683 --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Properties; +import java.util.stream.Collectors; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.VersionInfo; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; +import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; + + +/** + * connects to kafka and reads from the passed in topics. Parses each message into an avro object + * and dumps it to the console. + */ +@InterfaceAudience.Private +public final class DumpToStringListener { + private static final Logger LOG = LoggerFactory.getLogger(DumpToStringListener.class); + + private DumpToStringListener(){ + + } + + public static void main(String[] args) { + LOG.info("***** STARTING service '" + DumpToStringListener.class.getSimpleName() + "' *****"); + VersionInfo.logVersion(); + + Options options = new Options(); + options.addOption("k", "kafkabrokers", true, "Kafka Brokers " + + "(comma delimited)"); + options.addOption("t", "kafkatopics", true,"Kafka Topics " + + "to subscribe to (comma delimited)"); + CommandLine commandLine = null; + try { + commandLine = new BasicParser().parse(options, args); + } catch (ParseException e) { + LOG.error("Could not parse: ", e); + printUsageAndExit(options, -1); + } + SpecificDatumReader dreader = + new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$); + + String topic = commandLine.getOptionValue('t'); + Properties props = new Properties(); + props.put("bootstrap.servers", commandLine.getOptionValue('k')); + props.put("group.id", "hbase kafka test tool"); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); + + try (KafkaConsumer consumer = new KafkaConsumer<>(props);){ + consumer.subscribe(Arrays.stream(topic.split(",")).collect(Collectors.toList())); + + while (true) { + ConsumerRecords records = consumer.poll(10000); + Iterator> it = records.iterator(); + while (it.hasNext()) { + ConsumerRecord record = it.next(); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null); + try { + HBaseKafkaEvent event = dreader.read(null, decoder); + LOG.debug("key :" + Bytes.toString(record.key()) + " value " + event); + } catch (Exception e) { + LOG.error("caught exception ",e); + System.exit(-1); + } + } + } + } + } + + private static void printUsageAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hbase " + DumpToStringListener.class.getName(), "", options, + "\n[--kafkabrokers ] " + + "[-k ] \n", true); + System.exit(exitCode); + } + +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java new file mode 100755 index 0000000000..55ded5cebf --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutorService; + +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableBuilder; +import org.apache.hadoop.hbase.security.User; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +/** + * a alternative implementation of a connection object that forwards the mutations to a kafka queue + * depending on the routing rules (see kafka-route-rules.xml). + * */ +@InterfaceAudience.Private +public class KafkaBridgeConnection implements Connection { + private static final Logger LOG = LoggerFactory.getLogger(KafkaBridgeConnection.class); + + private final Configuration conf; + private final User user; + private final ExecutorService pool; + private volatile boolean closed = false; + private TopicRoutingRules routingRules; + private Producer producer; + private DatumWriter avroWriter = + new SpecificDatumWriter<>(HBaseKafkaEvent.getClassSchema()); + + + /** + * Public constructor + * @param conf hbase configuration + * @param pool executor pool + * @param user user who requested connection + * @throws IOException on error + */ + public KafkaBridgeConnection(Configuration conf, + ExecutorService pool, + User user) throws IOException { + this.conf = conf; + this.user = user; + this.pool = pool; + setupRules(); + startKafkaConnection(); + } + + /** + * for testing. + * @param conf hbase configuration + * @param pool executor service + * @param user user with connection + * @param routingRules a set of routing rules + * @param producer a kafka producer + * @throws IOException on error + */ + public KafkaBridgeConnection(Configuration conf, + ExecutorService pool, + User user, + TopicRoutingRules routingRules, + Producer producer) + throws IOException { + this.conf = conf; + this.user = user; + this.pool = pool; + this.producer=producer; + this.routingRules=routingRules; + } + + private void setupRules() throws IOException { + String file = this.conf.get(KafkaProxy.KAFKA_PROXY_RULES_FILE); + routingRules = new TopicRoutingRules(); + try (FileInputStream fin = new FileInputStream(file);){ + routingRules.parseRules(fin); + } + } + + private void startKafkaConnection() throws IOException { + Properties configProperties = new Properties(); + + String kafkaPropsFile = conf.get(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,""); + if (!StringUtils.isEmpty(kafkaPropsFile)){ + try (FileInputStream fs = new java.io.FileInputStream( + new File(kafkaPropsFile))){ + configProperties.load(fs); + } + } else { + String kafkaServers =conf.get(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS); + configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); + } + + configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + this.producer = new KafkaProducer(configProperties); + } + + + + @Override + public void abort(String why, Throwable e) {} + + @Override + public boolean isAborted() { + return false; + } + + @Override + public Configuration getConfiguration() { + return this.conf; + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return null; + } + + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { + return null; + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + return null; + } + + @Override + public Admin getAdmin() throws IOException { + return null; + } + + @Override + public void close() throws IOException { + if (!this.closed) { + this.closed = true; + this.producer.close(); + } + } + + @Override + public boolean isClosed() { + return this.closed; + } + + @Override + public TableBuilder getTableBuilder(final TableName tn, ExecutorService pool) { + if (isClosed()) { + throw new RuntimeException("KafkaBridgeConnection is closed."); + } + final Configuration passedInConfiguration = getConfiguration(); + return new TableBuilder() { + @Override + public TableBuilder setOperationTimeout(int timeout) { + return null; + } + + @Override + public TableBuilder setRpcTimeout(int timeout) { + return null; + } + + @Override + public TableBuilder setReadRpcTimeout(int timeout) { + return null; + } + + @Override + public TableBuilder setWriteRpcTimeout(int timeout) { + return null; + } + + @Override + public Table build() { + return new KafkaTableForBridge(tn,passedInConfiguration,routingRules,producer,avroWriter) ; + } + }; + } + +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java new file mode 100755 index 0000000000..fe21e9c70d --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.yetus.audience.InterfaceAudience; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; +import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; + + + +/** + * hbase to kafka bridge. + * + * Starts up a region server and receives replication events, just like a peer + * cluster member. It takes the events and cell by cell determines how to + * route them (see kafka-route-rules.xml) + */ +@InterfaceAudience.Private +public final class KafkaProxy { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProxy.class); + + public static final String KAFKA_PROXY_RULES_FILE = "kafkaproxy.rules.file"; + public static final String KAFKA_PROXY_KAFKA_PROPERTIES = "kafkaproxy.kafka.properties"; + public static final String KAFKA_PROXY_KAFKA_BROKERS = "kafkaproxy.kafka.brokers"; + + private static Map DEFAULT_PROPERTIES = new HashMap<>(); + + static { + DEFAULT_PROPERTIES.put("hbase.cluster.distributed","true"); + DEFAULT_PROPERTIES.put("zookeeper.znode.parent","/kafkaproxy"); + DEFAULT_PROPERTIES.put("hbase.regionserver.port","17020"); + DEFAULT_PROPERTIES.put("hbase.regionserver.info.port","17010"); + DEFAULT_PROPERTIES.put("hbase.client.connection.impl", + "org.apache.hadoop.hbase.kafka.KafkaBridgeConnection"); + DEFAULT_PROPERTIES.put("hbase.regionserver.admin.service","false"); + DEFAULT_PROPERTIES.put("hbase.regionserver.client.service","false"); + DEFAULT_PROPERTIES.put("hbase.wal.provider", + "org.apache.hadoop.hbase.wal.DisabledWALProvider"); + DEFAULT_PROPERTIES.put("hbase.regionserver.workers","false"); + DEFAULT_PROPERTIES.put("hfile.block.cache.size","0.0001"); + DEFAULT_PROPERTIES.put("hbase.mob.file.cache.size","0"); + DEFAULT_PROPERTIES.put("hbase.masterless","true"); + DEFAULT_PROPERTIES.put("hbase.regionserver.metahandler.count","1"); + DEFAULT_PROPERTIES.put("hbase.regionserver.replication.handler.count","1"); + DEFAULT_PROPERTIES.put("hbase.regionserver.handler.count","1"); + DEFAULT_PROPERTIES.put("hbase.ipc.server.read.threadpool.size","3"); + } + + private static void printUsageAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hbase kafkaproxy start", "", options, + "\nTo run the kafka proxy as a daemon, execute " + + "hbase-daemon.sh start|stop kafkaproxy " + + "[--kafkabrokers ] " + + "[-b ] " + + "[--routerulesfile ] " + + "[-r ] " + + "[--kafkaproperties ] " + + "[-f ] " + + "[--peername name of hbase peer to use (defaults to hbasekafka)] " + + "[-p name of hbase peer to use (defaults to hbasekafka)] " + + "[--znode root znode (defaults to /kafkaproxy)] " + + "[-z root znode (defaults to /kafkaproxy)] " + + + "[--enablepeer enable peer on startup (defaults to false)] " + + "[-e enable peer on startup (defaults to false)] " + + + "[--auto auto create peer] " + + "[-a auto create peer] \n", true); + System.exit(exitCode); + } + + /** + * private constructor + */ + private KafkaProxy() { + + } + + /** + * Start the service + * @param args program arguments + * @throws Exception on error + */ + public static void main(String[] args) throws Exception { + + Map otherProps = new HashMap<>(); + + Options options = new Options(); + + options.addOption("b", "kafkabrokers", true, + "Kafka Brokers (comma delimited)"); + options.addOption("r", "routerulesfile", true, + "file that has routing rules (defaults to conf/kafka-route-rules.xml"); + options.addOption("f", "kafkaproperties", true, + "Path to properties file that has the kafka connection properties"); + options.addOption("p", "peername", true, + "Name of hbase peer"); + options.addOption("z", "znode", true, + "root zode to use in zookeeper (defaults to /kafkaproxy)"); + options.addOption("a", "autopeer", false, + "Create a peer automatically to the hbase cluster"); + options.addOption("e", "enablepeer", false, + "enable peer on startup (defaults to false)"); + + + LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); + VersionInfo.logVersion(); + + Configuration conf = HBaseConfiguration.create(); + CommandLine commandLine = null; + try { + commandLine = new BasicParser().parse(options, args); + } catch (ParseException e) { + LOG.error("Could not parse: ", e); + printUsageAndExit(options, -1); + } + + String peer=""; + if (!commandLine.hasOption('p')) { + System.err.println("hbase peer id is required"); + System.exit(-1); + } + + peer = commandLine.getOptionValue('p'); + + boolean createPeer = false; + boolean enablePeer = false; + + if (commandLine.hasOption('a')){ + createPeer=true; + } + + if (commandLine.hasOption("e")){ + enablePeer=true; + } + + String rulesFile = StringUtils.defaultIfBlank( + commandLine.getOptionValue("r"),"kafka-route-rules.xml"); + + if (!new File(rulesFile).exists()){ + if (KafkaProxy.class.getClassLoader().getResource(rulesFile)!=null){ + rulesFile = KafkaProxy.class.getClassLoader().getResource(rulesFile).getFile(); + } else { + System.err.println("Rules file " + rulesFile + + " is invalid"); + System.exit(-1); + } + } + + otherProps.put(KafkaProxy.KAFKA_PROXY_RULES_FILE,rulesFile); + + if (commandLine.hasOption('f')){ + otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,commandLine.getOptionValue('f')); + } else if (commandLine.hasOption('b')){ + otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS,commandLine.getOptionValue('b')); + } else { + System.err.println("Kafka connection properites or brokers must be specified"); + System.exit(-1); + } + + String zookeeperQ = conf.get("hbase.zookeeper.quorum") + ":" + + conf.get("hbase.zookeeper.property.clientPort"); + + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20); + + try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy); + ) { + zk.start(); + String rootZnode = "/kafkaproxy"; + setupZookeeperZnodes(zk,rootZnode,peer); + checkForOrCreateReplicationPeer(conf,zk,rootZnode,peer,createPeer,enablePeer); + } + + @SuppressWarnings("unchecked") + Class regionServerClass = (Class) conf + .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); + + List allArgs = DEFAULT_PROPERTIES.keySet().stream() + .map((argKey)->("-D"+argKey+"="+ DEFAULT_PROPERTIES.get(argKey))) + .collect(Collectors.toList()); + + otherProps.keySet().stream() + .map((argKey)->("-D"+argKey+"="+ otherProps.get(argKey))) + .forEach((item)->allArgs.add(item)); + + Arrays.stream(args) + .filter((arg)->(arg.startsWith("-D")||arg.equals("start"))) + .forEach((arg)->allArgs.add(arg)); + + LOG.info("Args passed to region server "+allArgs); + + String[] newArgs=new String[allArgs.size()]; + allArgs.toArray(newArgs); + + new HRegionServerCommandLine(regionServerClass).doMain(newArgs); + } + + + /** + * Set up the needed znodes under the rootZnode + * @param zk CuratorFramework framework instance + * @param rootZnode Root znode + * @throws Exception If an error occurs + */ + public static void setupZookeeperZnodes(CuratorFramework zk, String rootZnode,String peer) + throws Exception { + // always gives the same uuid for the same name + UUID uuid = UUID.nameUUIDFromBytes(Bytes.toBytes(peer)); + String newValue = uuid.toString(); + byte []uuidBytes = Bytes.toBytes(newValue); + String idPath=rootZnode+"/hbaseid"; + if (zk.checkExists().forPath(idPath) == null) { + zk.create().creatingParentsIfNeeded().forPath(rootZnode + + "/hbaseid",uuidBytes); + } else { + // If the znode is there already make sure it has the + // expected value for the peer name. + byte[] znodeBytes = zk.getData().forPath(idPath).clone(); + if (!Bytes.equals(znodeBytes,uuidBytes)){ + String oldValue = Bytes.toString(znodeBytes); + LOG.warn("znode "+idPath+" has unexpected value "+ oldValue + +" expecting " + newValue + " " + + " (did the peer name for the proxy change?) " + + "Updating value"); + zk.setData().forPath(idPath, uuidBytes); + } + } + } + + /** + * Poll for the configured peer or create it if it does not exist + * (controlled by createIfMissing) + * @param hbaseConf the hbase configuratoin + * @param zk CuratorFramework object + * @param basePath base znode. + * @param peerName id if the peer to check for/create + * @param enablePeer if the peer is detected or created, enable it. + * @param createIfMissing if the peer doesn't exist, create it and peer to it. + */ + public static void checkForOrCreateReplicationPeer(Configuration hbaseConf, + CuratorFramework zk, + String basePath, + String peerName, boolean createIfMissing, + boolean enablePeer) { + try (Connection conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin()) { + + boolean peerThere = false; + boolean printedPeerMessage=false; + + while (!peerThere) { + try { + ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(peerName); + if (peerConfig !=null) { + peerThere=true; + } + } catch (ReplicationPeerNotFoundException e) { + if (createIfMissing) { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + // get the current cluster's ZK config + String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") + + ":" + + hbaseConf.get("hbase.zookeeper.property.clientPort"); + String znodePath = zookeeperQ + ":"+basePath; + ReplicationPeerConfig rconf = builder.setClusterKey(znodePath).build(); + admin.addReplicationPeer(peerName, rconf); + peerThere = true; + } + } + + if (peerThere) { + if (enablePeer){ + LOG.info("enable peer," + peerName); + admin.enableReplicationPeer(peerName); + } + break; + } else { + if (!printedPeerMessage){ + LOG.info("peer "+ + peerName+" not found, service will not completely start until the peer exists"); + printedPeerMessage=true; + } + + } + Thread.sleep(5000); + } + + LOG.info("found replication peer " + peerName); + + } catch (Exception e) { + LOG.error("Exception running proxy ",e); + } + } +} + diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java new file mode 100755 index 0000000000..b7c2d69980 --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + + +@InterfaceAudience.Private +/** + * Alternative table implementation provided by KafkaBridgeConnection that forwards all mutations + * provided to the batch() method to a configured kafka topic. + */ +public class KafkaTableForBridge implements Table { + private Logger LOG = LoggerFactory.getLogger(KafkaTableForBridge.class); + + private final Configuration conf; + private final TableName tableName; + private byte[] tableAsBytes; + + private Producer producer; + private TopicRoutingRules routingRules; + + private DatumWriter avroWriter; + + private static final class CheckMutation { + ByteBuffer qualifier; + ByteBuffer family; + Cell cell; + List topics = new ArrayList<>(); + } + + public KafkaTableForBridge(TableName tableName, + Configuration conf, + TopicRoutingRules routingRules, + Producer producer, + DatumWriter avroWriter){ + this.conf=conf; + this.tableName=tableName; + this.tableAsBytes=this.tableName.toBytes(); + this.routingRules=routingRules; + this.producer=producer; + this.avroWriter=avroWriter; + } + + private Stream> processMutation(CheckMutation check, + boolean isDelete){ + HBaseKafkaEvent event = new HBaseKafkaEvent(); + event.setKey(ByteBuffer.wrap(check.cell.getRowArray(), + check.cell.getRowOffset(), + check.cell.getRowLength())); + event.setTable(ByteBuffer.wrap(tableAsBytes)); + event.setDelete(isDelete); + event.setTimestamp(check.cell.getTimestamp()); + event.setFamily(check.family); + event.setQualifier(check.qualifier); + event.setValue(ByteBuffer.wrap(check.cell.getValueArray(), + check.cell.getValueOffset(), + check.cell.getValueLength())); + + return check.topics.stream() + .map((topic)->new Pair(topic,event)); + } + + private boolean keep(CheckMutation ret){ + if (!routingRules.isExclude(this.tableName,ret.family, ret.qualifier)){ + return true; + } + return false; + } + + private CheckMutation setTopics(CheckMutation ret){ + ret.topics= routingRules.getTopics(this.tableName,ret.family,ret.qualifier); + return ret; + } + + private ProducerRecord toByteArray(ByteArrayOutputStream bout, + Pair event, + BinaryEncoder encoder) throws IOException { + try { + bout.reset(); + BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, encoder); + avroWriter.write(event.getSecond(), encoderUse); + encoder.flush(); + bout.flush(); + return new ProducerRecord(event.getFirst(), + event.getSecond().getKey().array(), + bout.toByteArray()); + } catch (Exception e){ + throw new IOException(e); + } + } + + @Override + public void batch(final List actions, Object[] results) + throws IOException, InterruptedException { + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, null); + + LOG.debug("got {} inputs ",actions.size()); + + List> sends = new ArrayList<>(); + + List> messages = actions.stream() + .filter((row)->row instanceof Mutation) + .map((row)->(Mutation)row) + .flatMap((row)->{ + boolean isDelete = row instanceof Delete; + return row.getFamilyCellMap().keySet().stream() + .flatMap((family)->row.getFamilyCellMap().get(family).stream()) + .map((cell)->{ + CheckMutation ret = new CheckMutation(); + ret.family=ByteBuffer.wrap(cell.getFamilyArray(), + cell.getFamilyOffset(), + cell.getFamilyLength()); + ret.qualifier=ByteBuffer.wrap(cell.getQualifierArray() + ,cell.getQualifierOffset(), + cell.getQualifierLength()); + ret.cell=cell; + return ret; + }) + .filter((check)->keep(check)) + .map((check)-> setTopics(check)) + .filter((check)->!CollectionUtils.isEmpty(check.topics)) + .flatMap((check)->processMutation(check,isDelete)); + }).collect(Collectors.toList()); + + for (Pair event : messages){ + ProducerRecord producerRecord = toByteArray(bout,event,encoderUse); + sends.add(producer.send(producerRecord)); + } + + this.producer.flush(); + + for (Future sendRecord : sends){ + try { + sendRecord.get(); + } catch (Exception e){ + LOG.error("Exception caught when getting result",e); + throw new IOException(e); + } + } + + } + + @Override + public TableName getName() { + return this.tableName; + } + + @Override + public Configuration getConfiguration() { + return this.conf; + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return null; + } + + @Override + public TableDescriptor getDescriptor() throws IOException { + return null; + } + + @Override + public void close() { + + } +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java new file mode 100755 index 0000000000..f7fdd7175c --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.nio.ByteBuffer; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + + + +/** + * Implements the matching logic for a rule + */ +@InterfaceAudience.Private +public abstract class Rule { + TableName tableName; + private byte [] columnFamily; + private byte [] qualifier; + + boolean qualifierStartsWith = false; + boolean qualifierEndsWith = false; + + byte []ast = Bytes.toBytes("*"); + + /** + * Indicates if the table,column family, and qualifier match the rule + * @param tryTable table name to test + * @param tryColumFamily column family to test + * @param tryQualifier qualifier to test + * @return true if values match the rule + */ + public boolean match(TableName tryTable, ByteBuffer tryColumFamily, ByteBuffer tryQualifier) { + boolean tableMatch = tableMatch(tryTable); + boolean columnFamilyMatch = columnFamilyMatch(tryColumFamily); + boolean qualfierMatch = qualifierMatch(tryQualifier); + + return tableMatch && columnFamilyMatch && qualfierMatch; + } + + /** + * Test if the qualifier matches + * @param tryQualifier qualifier to test + * @return true if the qualifier matches + */ + public boolean qualifierMatch(ByteBuffer tryQualifier) { + + if (qualifier != null) { + if (qualifierStartsWith && qualifierEndsWith) { + return (startsWith(tryQualifier, this.qualifier) || endsWith(tryQualifier, this.qualifier)); + } else if (qualifierStartsWith) { + return startsWith(tryQualifier, this.qualifier); + } else if (qualifierEndsWith) { + return endsWith(tryQualifier, this.qualifier); + } else { + return Bytes.equals(this.qualifier, tryQualifier); + } + } + return true; + } + + /** + * Test if the column family matches the rule + * @param tryColumFamily column family to test + * @return true if the column family matches the rule + */ + public boolean columnFamilyMatch(ByteBuffer tryColumFamily) { + if (columnFamily != null) { + return Bytes.equals(this.columnFamily, tryColumFamily); + } + return true; + } + + /** + * Test if the table matches the table in the rule + * @param tryTable table name to test + * @return true if the table matches the rule + */ + public boolean tableMatch(TableName tryTable) { + if (tableName == null) { + return true; + } + return (tryTable.equals(this.tableName)); + } + + /** + * set the column family for the rule + * @param columnFamily column family to set + */ + public void setColumnFamily(byte [] columnFamily) { + this.columnFamily = columnFamily; + } + + /** + * set the qualifier value for the rule + * @param qualifier qualifier to set + */ + public void setQualifier(byte []qualifier) { + this.qualifier = qualifier; + if (startsWith(ByteBuffer.wrap(qualifier), ast)) { + qualifierEndsWith = true; + this.qualifier = ArrayUtils.subarray(this.qualifier, ast.length, this.qualifier.length); + } + if (endsWith(ByteBuffer.wrap(qualifier), ast)) { + qualifierStartsWith = true; + this.qualifier = ArrayUtils.subarray(this.qualifier, 0, this.qualifier.length - ast.length); + } + if ((qualifierStartsWith) || (qualifierEndsWith)) { + if (this.qualifier.length == 0) { + this.qualifier = null; + } + } + + } + + /** + * Tests if data starts with startsWith + * @param data byte array to test + * @param startsWith array that we want to see if data starts with + * @return true if data starts with startsWith + */ + public static boolean startsWith(ByteBuffer data, byte [] startsWith) { + if (startsWith.length > data.limit()) { + return false; + } + ByteBuffer test = data.duplicate(); + for (int i = 0 ; i < startsWith.length; i++){ + if (test.get() != startsWith[i]){ + return false; + } + } + return true; + } + + /** + * Tests if data ends with endsWith + * @param data byte array to test + * @param endsWith array that we want to see if data ends with + * @return true if data ends with endsWith + */ + public static boolean endsWith(ByteBuffer data, byte [] endsWith) { + if (endsWith.length > data.limit()) { + return false; + } + + ByteBuffer test = data.duplicate(); + test.position(test.limit()-endsWith.length); + for (int i = 0 ; i < endsWith.length;i++){ + if (test.get() != endsWith[i]){ + return false; + } + } + return true; + } + + /** + * get the table for the rule + * @return tablename for ule + */ + public TableName getTableName() { + return tableName; + } + + /** + * set the table for the rule + * @param tableName to set + */ + public void setTableName(TableName tableName) { + this.tableName = tableName; + } + + /** + * get the column family for the rule + * @return column family + */ + public byte[] getColumnFamily() { + return columnFamily; + } + + /** + * get the qualifier for the rule + * @return qualfier + */ + public byte[] getQualifier() { + return qualifier; + } + + + /** + * indicates if the qualfier is a wildcard like *foo + * @return true if rule is like *foo + */ + public boolean isQualifierEndsWith() { + return qualifierEndsWith; + } + + /** + * indicates if the qualfier is a wildcard like foo* + * @return true if rule is like foo* + */ + public boolean isQualifierStartsWith() { + return qualifierStartsWith; + + } +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java new file mode 100755 index 0000000000..5a02081c1f --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +/** + * The topic routing/drop rules. + * + * <rules> + * <rule .... /> + * + * </rules> + * + * + * + * A wildcard can only be at the beginning or at the end (can be at both sides). + * + * drop rules are always evaluated first. + * + * drop examples: + * <rule action="drop" table="default:MyTable" /> + * Do not send replication events for table MyTable + * + * <rule action="drop" table="default:MyTable" columnFamily="data"/> + * Do not send replication events for table MyTable's column family data + * + * <rule action="drop" table="default:MyTable" columnFamily="data" qualfier="dhold:*"/> + * Do not send replication events for any qualiifier on table MyTable with column family data + * + * routeRules examples: + * + * <rule action="routeRules" table="default:MyTable" topic="mytopic"/> + * routeRules all replication events for table default:Mytable to topic mytopic + * + * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"/> + * routeRules all replication events for table default:Mytable column family data to topic mytopic + * + * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic" + * qualifier="hold:*"/> + * routeRules all replication events for qualifiers that start with hold: for table + * default:Mytable column family data to topic mytopic + */ +@InterfaceAudience.Private +public class TopicRoutingRules { + + private List dropRules = new ArrayList<>(); + private List routeRules = new ArrayList<>(); + + private File sourceFile; + + /** + * used for testing + */ + public TopicRoutingRules() { + + } + + /** + * construct rule set from file + * @param source file that countains the rule set + * @throws Exception if load fails + */ + public TopicRoutingRules(File source) throws Exception { + this.sourceFile = source; + this.reloadIfFile(); + } + + /** + * Reload the ruleset if it was parsed from a file + * @throws Exception error loading rule set + */ + public void reloadIfFile() throws Exception { + if (this.sourceFile!=null){ + List dropRulesSave = this.dropRules; + List routeRulesSave = this.routeRules; + + try (FileInputStream fin = new FileInputStream(this.sourceFile)) { + List dropRulesNew = new ArrayList<>(); + List routeRulesNew = new ArrayList<>(); + + parseRules(fin,dropRulesNew,routeRulesNew); + + this.dropRules = dropRulesNew; + this.routeRules = routeRulesNew; + + } catch (Exception e){ + // roll back + this.dropRules=dropRulesSave; + this.routeRules=routeRulesSave; + // re-throw + throw e; + } + } + } + + /** + * parse rules manually from an input stream + * @param input InputStream that contains rule text + */ + public void parseRules(InputStream input) { + List dropRulesNew = new ArrayList<>(); + List routeRulesNew = new ArrayList<>(); + parseRules(input,dropRulesNew,routeRulesNew); + this.dropRules = dropRulesNew; + this.routeRules = routeRulesNew; + } + + /** + * Parse the XML in the InputStream into route/drop rules and store them in the passed in Lists + * @param input inputstream the contains the ruleset + * @param dropRules list to accumulate drop rules + * @param routeRules list to accumulate route rules + */ + public void parseRules(InputStream input,List dropRules, List routeRules) { + try { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); + Document doc = dBuilder.parse(input); + NodeList nodList = doc.getElementsByTagName("rule"); + for (int i = 0; i < nodList.getLength(); i++) { + if (nodList.item(i) instanceof Element) { + parseRule((Element) nodList.item(i),dropRules,routeRules); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Parse a individual rule from a Element. + * @param n the element + * @param dropRules list to accumulate drop rules + * @param routeRules list to accumulate route rules + */ + public void parseRule(Element n, List dropRules, List routeRules) { + Rule r = null; + if (n.getAttribute("action").equals("drop")) { + r = new DropRule(); + dropRules.add((DropRule) r); + } else { + r = new TopicRule(n.getAttribute("topic")); + routeRules.add((TopicRule) r); + } + if (n.hasAttribute("table")) { + r.setTableName(TableName.valueOf(n.getAttribute("table"))); + } + if (n.hasAttribute("columnFamily")) { + r.setColumnFamily(Bytes.toBytes(n.getAttribute("columnFamily"))); + } + if (n.hasAttribute("qualifier")) { + String qual = n.getAttribute("qualifier"); + r.setQualifier(Bytes.toBytes(qual)); + } + } + + /** + * Indicates if a cell mutation should be dropped instead of routed to kafka. + * @param table table name to check + * @param columnFamily column family to check + * @param qualifer qualifier name to check + * @return if the mutation should be dropped instead of routed to Kafka + */ + public boolean isExclude(final TableName table, final ByteBuffer columnFamily, + final ByteBuffer qualifer) { + for (DropRule r : getDropRules()) { + if (r.match(table, columnFamily, qualifer)) { + return true; + } + } + return false; + } + + /** + * Get topics for the table/column family/qualifier combination + * @param table table name to check + * @param columnFamily column family to check + * @param qualifer qualifier name to check + * @return list of topics that match the passed in values (or empty for none). + */ + public List getTopics(TableName table, ByteBuffer columnFamily, ByteBuffer qualifer) { + List ret = new ArrayList<>(); + for (TopicRule r : getRouteRules()) { + if (r.match(table, columnFamily, qualifer)) { + ret.addAll(r.getTopics()); + } + } + + return ret; + } + + /** + * returns all the drop rules (used for testing) + * @return drop rules + */ + public List getDropRules() { + return dropRules; + } + + /** + * returns all the route rules (used for testing) + * @return route rules + */ + public List getRouteRules() { + return routeRules; + } +} diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java new file mode 100755 index 0000000000..5e5b6bfd94 --- /dev/null +++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * If the Cell matches the rule returns the configured topics. + */ +@InterfaceAudience.Private +public class TopicRule extends Rule { + private Set topics = new HashSet<>(); + + public TopicRule(String topics) { + this.topics.addAll(Arrays.stream(topics.split(",")).collect(Collectors.toList())); + } + + public Set getTopics() { + return topics; + } +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java new file mode 100755 index 0000000000..322047ef44 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; + +/** + * Mocks Kafka producer for testing + */ +public class ProducerForTesting implements Producer { + Map> messages = new HashMap<>(); + SpecificDatumReader dreader = new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$); + + public Map> getMessages() { + return messages; + } + + @Override + public void abortTransaction() throws ProducerFencedException { + } + + @Override + public Future send(ProducerRecord producerRecord) { + try { + + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(producerRecord.value(), null); + HBaseKafkaEvent event = dreader.read(null, decoder); + if (!messages.containsKey(producerRecord.topic())) { + messages.put(producerRecord.topic(), new ArrayList<>()); + } + messages.get(producerRecord.topic()).add(event); + return new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public RecordMetadata get() throws InterruptedException, ExecutionException { + return new RecordMetadata(null, 1, 1, 1, 1, 1, 1); + } + + @Override + public RecordMetadata get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + }; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + @Override + public Future send(ProducerRecord producerRecord, + Callback callback) { + return null; + } + + @Override + public void flush() { + } + + @Override + public List partitionsFor(String s) { + return null; + } + + @Override + public Map metrics() { + return null; + } + + @Override + public void close() { + } + + @Override + public void close(long l, TimeUnit timeUnit) { + } + + @Override + public void initTransactions() { + } + + @Override + public void beginTransaction() throws ProducerFencedException { + } + + @Override + public void sendOffsetsToTransaction(Map offsets, + String consumerGroupId) throws ProducerFencedException { + } + + @Override + public void commitTransaction() throws ProducerFencedException { + } +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java new file mode 100755 index 0000000000..16b9971ee6 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test different cases of drop rules. + */ + +@Category(SmallTests.class) +public class TestDropRule { + private static final String DROP_RULE1 = + ""; + private static final String DROP_RULE2 = + ""; + private static final String DROP_RULE3 = + ""; + + private static final String DROP_RULE4 = + ""; + private static final String DROP_RULE5 = + ""; + + private static final String DROP_RULE6 = + ""; + + @Test + public void testDropies1() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE1.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertEquals(null, rules.getDropRules().get(0).getColumnFamily()); + Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier()); + Assert.assertEquals(0, rules.getRouteRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies2() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE2.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily())); + Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier()); + Assert.assertEquals(0, rules.getRouteRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies3() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE3.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily())); + Assert + .assertTrue(Bytes.equals( + "dhold".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getRouteRules().size()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies4() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE4.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("dhold:".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getRouteRules().size()); + + DropRule drop = rules.getDropRules().get(0); + Assert.assertFalse( + drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blah".getBytes("UTF-8")))); + Assert.assertFalse( + drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("dholdme".getBytes("UTF-8")))); + Assert.assertTrue( + drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("dhold:me".getBytes("UTF-8")))); + + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies5() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE5.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("pickme".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getRouteRules().size()); + + DropRule drop = rules.getDropRules().get(0); + Assert.assertFalse( + drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blah".getBytes("UTF-8")))); + Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blacickme".getBytes("UTF-8")))); + Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("hithere.pickme".getBytes("UTF-8")))); + + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDropies6() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(DROP_RULE6.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getDropRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getDropRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("pickme".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getRouteRules().size()); + + DropRule drop = rules.getDropRules().get(0); + Assert.assertFalse( + drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blah".getBytes("UTF-8")))); + Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blacickme".getBytes("UTF-8")))); + Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("hithere.pickme".getBytes("UTF-8")))); + Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("pickme.pleaze.do.it".getBytes("UTF-8")))); + Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("please.pickme.pleaze".getBytes("UTF-8")))); + Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("pickme.pleaze.pickme".getBytes("UTF-8")))); + + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java new file mode 100755 index 0000000000..f7374ab4b3 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +/** + * Test that mutations are getting published to the topic + */ +@Category(SmallTests.class) +public class TestProcessMutations { + private User user = new User() { + @Override + public String getShortName() { + return "my name"; + } + + @Override + public T runAs(PrivilegedAction action) { + return null; + } + + @Override + public T runAs(PrivilegedExceptionAction action) + throws IOException, InterruptedException { + return null; + } + }; + + private static final String ROUTE_RULE1 = + ""; + + private static final String ROUTE_RULE2 = + ""; + + private static final String ROUTE_RULE3 = + "" + + "" + + "" + + " "+ + "" + + "" + + ""; + + + ProducerForTesting myTestingProducer; + + @Before + public void setup() { + this.myTestingProducer=new ProducerForTesting(); + } + + @After + public void tearDown() { + + } + + @Test + public void testSendMessage() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + + //Configuration conf, ExecutorService pool, User user, + // TopicRoutingRules routingRules,Producer producer + + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes("UTF-8"))); + Configuration conf = new Configuration(); + KafkaBridgeConnection connection = + new KafkaBridgeConnection( + conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer); + long zeTimestamp = System.currentTimeMillis(); + Put put = new Put("key1".getBytes("UTF-8"),zeTimestamp); + put.addColumn("FAMILY".getBytes("UTF-8"), + "not foo".getBytes("UTF-8"), + "VALUE should pass again".getBytes("UTF-8")); + put.addColumn("FAMILY".getBytes("UTF-8"), + "foo".getBytes("UTF-8"), + "VALUE should pass".getBytes("UTF-8")); + Table myTable = connection.getTable(TableName.valueOf("MyNamespace:MyTable")); + List rows = new ArrayList<>(); + rows.add(put); + myTable.batch(rows,new Object[0]); + + Assert.assertEquals(false,myTestingProducer.getMessages().isEmpty()); + + } catch (Exception e){ + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSendMessage1() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + + //Configuration conf, ExecutorService pool, User user, + // TopicRoutingRules routingRules,Producer producer + + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE2.getBytes("UTF-8"))); + Configuration conf = new Configuration(); + KafkaBridgeConnection connection = + new KafkaBridgeConnection( + conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer); + long zeTimestamp = System.currentTimeMillis(); + Put put = new Put("key1".getBytes("UTF-8"),zeTimestamp); + put.addColumn("FAMILY".getBytes("UTF-8"), + "not foo".getBytes("UTF-8"), + "VALUE should NOT pass".getBytes("UTF-8")); + put.addColumn("FAMILY".getBytes("UTF-8"), + "fooBarBaz".getBytes("UTF-8"), + "VALUE should pass".getBytes("UTF-8")); + Table myTable = connection.getTable(TableName.valueOf("MyNamespace:MyTable")); + List rows = new ArrayList<>(); + rows.add(put); + + myTable.batch(rows,new Object[0]); + + Assert.assertEquals(false,myTestingProducer.getMessages().isEmpty()); + Assert.assertEquals(1,myTestingProducer.getMessages().get("foo").size()); + + Assert.assertTrue(Bytes.equals( + "fooBarBaz".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(0).getQualifier().array())); + + Assert.assertTrue(Bytes.equals( + "VALUE should pass".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(0).getValue().array())); + + Assert.assertEquals(false, + myTestingProducer.getMessages().get("foo").get(0).getDelete()); + + Assert.assertEquals(zeTimestamp, + myTestingProducer.getMessages().get("foo").get(0).getTimestamp().longValue()); + + Assert.assertTrue(Bytes.equals( + "key1".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(0).getKey().array())); + + Assert.assertTrue(Bytes.equals( + TableName.valueOf("MyNamespace:MyTable").getName(), + myTestingProducer.getMessages().get("foo").get(0).getTable().array())); + + } catch (Exception e){ + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSendMessage2() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + + //Configuration conf, ExecutorService pool, User user, + // TopicRoutingRules routingRules,Producer producer + + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE3.getBytes("UTF-8"))); + Configuration conf = new Configuration(); + KafkaBridgeConnection connection = + new KafkaBridgeConnection( + conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer); + long zeTimestamp = System.currentTimeMillis(); + Put put = new Put("key1".getBytes("UTF-8"),zeTimestamp); + put.addColumn("FAMILY".getBytes("UTF-8"), + "not foo".getBytes("UTF-8"), + "VALUE should kinda pass".getBytes("UTF-8")); + put.addColumn("FAMILY".getBytes("UTF-8"), + "fooBarBaz".getBytes("UTF-8"), + "VALUE should pass".getBytes("UTF-8")); + Table myTable = connection.getTable(TableName.valueOf("MyNamespace:MyTable")); + List rows = new ArrayList<>(); + rows.add(put); + + put = new Put("key2".getBytes("UTF-8"),zeTimestamp); + put.addColumn("FAMILY1".getBytes("UTF-8"), + "not foo 1".getBytes("UTF-8"), + "VALUE should NOT pass for not foo 1".getBytes("UTF-8")); + put.addColumn("FAMILY3".getBytes("UTF-8"), + "foo 2".getBytes("UTF-8"), + "VALUE should pass for foo 2".getBytes("UTF-8")); + rows.add(put); + + + myTable.batch(rows,new Object[0]); + + Assert.assertEquals(false,myTestingProducer.getMessages().isEmpty()); + Assert.assertEquals(3,myTestingProducer.getMessages().get("foo").size()); + + Assert.assertTrue(Bytes.equals( + "fooBarBaz".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(1).getQualifier().array())); + + Assert.assertTrue(Bytes.equals( + "VALUE should pass".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(1).getValue().array())); + + Assert.assertEquals(false, + myTestingProducer.getMessages().get("foo").get(1).getDelete()); + + Assert.assertEquals(zeTimestamp, + myTestingProducer.getMessages().get("foo").get(1).getTimestamp().longValue()); + + Assert.assertTrue(Bytes.equals( + "key1".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(1).getKey().array())); + + Assert.assertTrue(Bytes.equals( + TableName.valueOf("MyNamespace:MyTable").getName(), + myTestingProducer.getMessages().get("foo").get(1).getTable().array())); + + + Assert.assertTrue(Bytes.equals( + "not foo".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(0).getQualifier().array())); + + Assert.assertTrue(Bytes.equals( + "VALUE should kinda pass".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(0).getValue().array())); + + Assert.assertEquals(false, + myTestingProducer.getMessages().get("foo").get(0).getDelete()); + + Assert.assertEquals(zeTimestamp, + myTestingProducer.getMessages().get("foo").get(0).getTimestamp().longValue()); + + Assert.assertTrue(Bytes.equals( + "key1".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(0).getKey().array())); + + Assert.assertTrue(Bytes.equals( + TableName.valueOf("MyNamespace:MyTable").getName(), + myTestingProducer.getMessages().get("foo").get(0).getTable().array())); + + + Assert.assertTrue(Bytes.equals( + "FAMILY3".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(2).getFamily().array())); + + Assert.assertTrue(Bytes.equals( + "foo 2".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(2).getQualifier().array())); + + Assert.assertTrue(Bytes.equals( + "VALUE should pass for foo 2".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(2).getValue().array())); + + Assert.assertEquals(false, + myTestingProducer.getMessages().get("foo").get(2).getDelete()); + + Assert.assertEquals(zeTimestamp, + myTestingProducer.getMessages().get("foo").get(2).getTimestamp().longValue()); + + Assert.assertTrue(Bytes.equals( + "key2".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo").get(2).getKey().array())); + + Assert.assertTrue(Bytes.equals( + TableName.valueOf("MyNamespace:MyTable").getName(), + myTestingProducer.getMessages().get("foo").get(2).getTable().array())); + + } catch (Exception e){ + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSendMessage3() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + + //Configuration conf, ExecutorService pool, User user, + // TopicRoutingRules routingRules,Producer producer + + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE3.getBytes("UTF-8"))); + Configuration conf = new Configuration(); + KafkaBridgeConnection connection = + new KafkaBridgeConnection( + conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer); + long zeTimestamp = System.currentTimeMillis(); + Delete nuke = new Delete("key1".getBytes("UTF-8"),zeTimestamp); + nuke.addColumn("FAMILY".getBytes("UTF-8"), + "probably not foo".getBytes("UTF-8")); + nuke.addColumn("FAMILY".getBytes("UTF-8"), + "fooBarBaz".getBytes("UTF-8")); + Table myTable = connection.getTable(TableName.valueOf("MyNamespace:MyTable")); + List rows = new ArrayList<>(); + rows.add(nuke); + + myTable.batch(rows,new Object[0]); + + Assert.assertEquals(false,myTestingProducer.getMessages().isEmpty()); + Assert.assertEquals(1,myTestingProducer.getMessages().get("foo1").size()); + + Assert.assertTrue(Bytes.equals( + "probably not foo".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo1").get(0).getQualifier().array())); + + Assert.assertTrue(Bytes.equals( + "FAMILY".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo1").get(0).getFamily().array())); + + Assert.assertTrue(Bytes.equals( + "".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo1").get(0).getValue().array())); + + Assert.assertEquals(true, + myTestingProducer.getMessages().get("foo1").get(0).getDelete()); + + Assert.assertEquals(zeTimestamp, + myTestingProducer.getMessages().get("foo1").get(0).getTimestamp().longValue()); + + Assert.assertTrue(Bytes.equals( + "key1".getBytes("UTF-8"), + myTestingProducer.getMessages().get("foo1").get(0).getKey().array())); + + Assert.assertTrue(Bytes.equals( + TableName.valueOf("MyNamespace:MyTable").getName(), + myTestingProducer.getMessages().get("foo1").get(0).getTable().array())); + + } catch (Exception e){ + Assert.fail(e.getMessage()); + } + } + +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java new file mode 100755 index 0000000000..fa2a6ca09e --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +/** + * Make sure match rules work + */ +@Category(SmallTests.class) +public class TestQualifierMatching { + + @Test + public void testMatchQualfier() throws Exception { + DropRule rule = new DropRule(); + rule.setQualifier("data".getBytes("UTF-8")); + Assert.assertTrue(rule.qualifierMatch(ByteBuffer.wrap("data".getBytes("UTF-8")))); + + rule = new DropRule(); + rule.setQualifier("data1".getBytes("UTF-8")); + Assert.assertFalse(rule.qualifierMatch(ByteBuffer.wrap("data".getBytes("UTF-8")))); + + // if not set, it is a wildcard + rule = new DropRule(); + Assert.assertTrue(rule.qualifierMatch(ByteBuffer.wrap("data".getBytes("UTF-8")))); + } + + @Test + public void testStartWithQualifier() throws Exception{ + DropRule rule = new DropRule(); + rule.setQualifier("data*".getBytes("UTF-8")); + Assert.assertTrue(rule.isQualifierStartsWith()); + Assert.assertFalse(rule.isQualifierEndsWith()); + + Assert.assertTrue(rule.qualifierMatch(ByteBuffer.wrap("data".getBytes("UTF-8")))); + Assert.assertTrue(rule.qualifierMatch(ByteBuffer.wrap("data1".getBytes("UTF-8")))); + Assert.assertTrue(rule.qualifierMatch(ByteBuffer.wrap("datafoobar".getBytes("UTF-8")))); + Assert.assertFalse(rule.qualifierMatch(ByteBuffer.wrap("datfoobar".getBytes("UTF-8")))); + Assert.assertFalse(rule.qualifierMatch(ByteBuffer.wrap("d".getBytes("UTF-8")))); + Assert.assertFalse(rule.qualifierMatch(ByteBuffer.wrap("".getBytes("UTF-8")))); + } + + @Test + public void testEndsWithQualifier() throws Exception { + DropRule rule = new DropRule(); + rule.setQualifier("*data".getBytes("UTF-8")); + Assert.assertFalse(rule.isQualifierStartsWith()); + Assert.assertTrue(rule.isQualifierEndsWith()); + + Assert.assertTrue(rule.qualifierMatch(ByteBuffer.wrap("data".getBytes("UTF-8")))); + Assert.assertTrue(rule.qualifierMatch(ByteBuffer.wrap("foobardata".getBytes("UTF-8")))); + Assert.assertFalse(rule.qualifierMatch(ByteBuffer.wrap("foobardat".getBytes("UTF-8")))); + Assert.assertFalse(rule.qualifierMatch(ByteBuffer.wrap("d".getBytes("UTF-8")))); + Assert.assertFalse(rule.qualifierMatch(ByteBuffer.wrap("".getBytes("UTF-8")))); + } + +} diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java new file mode 100755 index 0000000000..892750b161 --- /dev/null +++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.kafka; + +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test drop rules + */ +@Category(SmallTests.class) +public class TestRouteRules { + private static final String ROUTE_RULE1 = + ""; + private static final String ROUTE_RULE2 = + ""; + private static final String ROUTE_RULE3 = + ""; + + private static final String ROUTE_RULE4 = + ""; + private static final String ROUTE_RULE5 = + ""; + + private static final String ROUTE_RULE6 = + ""; + + @Test + public void testTopic1() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertEquals(null, rules.getRouteRules().get(0).getColumnFamily()); + Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier()); + Assert.assertEquals(0, rules.getDropRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic2() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE2.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier()); + Assert.assertEquals(0, rules.getDropRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic3() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE3.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("dhold".getBytes("UTF-8"), rules.getRouteRules().get(0).getQualifier())); + Assert.assertTrue(rules.getRouteRules().get(0).getTopics().contains("foo")); + Assert.assertEquals(rules.getRouteRules().get(0).getTopics().size(), 1); + + Assert.assertEquals(0, rules.getDropRules().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic4() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE4.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("dhold:".getBytes("UTF-8"), rules.getRouteRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getDropRules().size()); + + TopicRule route = rules.getRouteRules().get(0); + Assert.assertFalse( + route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blah".getBytes("UTF-8")))); + Assert.assertFalse( + route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("dholdme".getBytes("UTF-8")))); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("dhold:me".getBytes("UTF-8")))); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic5() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE5.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("pickme".getBytes("UTF-8"), rules.getRouteRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getDropRules().size()); + + TopicRule route = rules.getRouteRules().get(0); + Assert.assertFalse( + route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blah".getBytes("UTF-8")))); + Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blacickme".getBytes("UTF-8")))); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("hithere.pickme".getBytes("UTF-8")))); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testTopic6() { + TopicRoutingRules rules = new TopicRoutingRules(); + try { + rules.parseRules(new ByteArrayInputStream(ROUTE_RULE6.getBytes("UTF-8"))); + Assert.assertEquals(1, rules.getRouteRules().size()); + Assert.assertEquals(TableName.valueOf("default:MyTable"), + rules.getRouteRules().get(0).getTableName()); + Assert.assertTrue( + Bytes.equals("data".getBytes("UTF-8"), + rules.getRouteRules().get(0).getColumnFamily())); + Assert.assertTrue( + Bytes.equals("pickme".getBytes("UTF-8"), + rules.getRouteRules().get(0).getQualifier())); + Assert.assertEquals(0, rules.getDropRules().size()); + + TopicRule route = rules.getRouteRules().get(0); + Assert.assertFalse( + route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blah".getBytes("UTF-8")))); + Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("blacickme".getBytes("UTF-8")))); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("hithere.pickme".getBytes("UTF-8")))); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("pickme.pleaze.do.it".getBytes("UTF-8")))); + Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("please.pickme.pleaze".getBytes("UTF-8")))); + Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"), + ByteBuffer.wrap("data".getBytes("UTF-8")), + ByteBuffer.wrap("pickme.pleaze.pickme".getBytes("UTF-8")))); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + +} diff --git a/pom.xml b/pom.xml index f65091d404..4ed14eee83 100755 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,8 @@ hbase-spark-it hbase-backup hbase-zookeeper + hbase-kafka-model + hbase-kafka-proxy @@ -1802,6 +1804,16 @@ org.apache.hbase ${project.version} + + hbase-kafka-model + org.apache.hbase + ${project.version} + + + hbase-kafka-proxy + org.apache.hbase + ${project.version} + hbase-metrics-api org.apache.hbase