diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java index 63066b3..e7f3123 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Set; @@ -54,7 +55,7 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public final class TableName implements Comparable { +public final class TableName implements Comparable, Serializable { /** See {@link #createTableNameIfNecessary(ByteBuffer, ByteBuffer)} */ private static final Set tableCache = new CopyOnWriteArraySet(); diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml new file mode 100644 index 0000000..aa0ba36 --- /dev/null +++ b/hbase-spark/pom.xml @@ -0,0 +1,558 @@ + + + + + + 4.0.0 + + + hbase + org.apache.hbase + 2.0.0-SNAPSHOT + + hbase-spark + HBase - Spark + + + 1.3.0 + 2.10.4 + 2.10 + ${project.basedir}/.. + + + + + + javax.servlet + javax.servlet-api + 3.0.1 + test + + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + test-jar + tests + + + + junit + junit + test + + + + org.scalatest + scalatest_${scala.binary.version} + 2.2.4 + test + + + + org.scalamock + scalamock-scalatest-support_${scala.binary.version} + 3.1.4 + test + + + + org.apache.hadoop + hadoop-client + ${hadoop-two.version} + + + log4j + log4j + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hadoop + hadoop-common + ${hadoop-two.version} + + + log4j + log4j + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hadoop + hadoop-common + ${hadoop-two.version} + test-jar + test + + + log4j + log4j + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop-two.version} + test-jar + test + + + log4j + log4j + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hbase + hbase-client + + + log4j + log4j + + + org.apache.thrift + thrift + + + org.jruby + jruby-complete + + + org.slf4j + slf4j-log4j12 + + + org.mortbay.jetty + jsp-2.1 + + + org.mortbay.jetty + jsp-api-2.1 + + + org.mortbay.jetty + servlet-api-2.5 + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hbase + hbase-hadoop-compat + ${project.version} + test + test-jar + + + log4j + log4j + + + org.apache.thrift + thrift + + + org.jruby + jruby-complete + + + org.slf4j + slf4j-log4j12 + + + org.mortbay.jetty + jsp-2.1 + + + org.mortbay.jetty + jsp-api-2.1 + + + org.mortbay.jetty + servlet-api-2.5 + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hbase + hbase-hadoop2-compat + ${project.version} + test + test-jar + + + log4j + log4j + + + org.apache.thrift + thrift + + + org.jruby + jruby-complete + + + org.slf4j + slf4j-log4j12 + + + org.mortbay.jetty + jsp-2.1 + + + org.mortbay.jetty + jsp-api-2.1 + + + org.mortbay.jetty + servlet-api-2.5 + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + org.apache.hbase + hbase-server + ${project.version} + + + org.apache.hbase + hbase-server + ${project.version} + test + test-jar + + + org.apache.hbase + hbase-it + ${project.version} + test-jar + test + + + + + + src/test/scala + + + org.apache.maven.plugins + maven-compiler-plugin + + + + net.alchim31.maven + scala-maven-plugin + 3.2.0 + + ${project.build.sourceEncoding} + + -Xmx1024m + + ${scala.version} + + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + WDF TestSuite.txt + + + + test + test + + test + + + true + + + + integration-test + integration-test + + test + + + true + Integration-Test + -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + + + + + + + + \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java new file mode 100644 index 0000000..c4e55bd --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import java.util.ArrayList; +import java.util.List; + +public class JavaHBaseBulkDeleteExample { + public static void main(String args[]) { + if (args.length == 0) { + System.out.println("JavaHBaseBulkDeleteExample {master} {tableName} "); + } + + String master = args[0]; + String tableName = args[1]; + + JavaSparkContext jsc = new JavaSparkContext(master, + "JavaHBaseBulkDeleteExample"); + jsc.addJar("spark.jar"); + + List list = new ArrayList(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName), new DeleteFunction(), 4); + + } + + public static class DeleteFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Delete call(byte[] v) throws Exception { + + return new Delete(v); + } + + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java new file mode 100644 index 0000000..dcf2c18 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +public class JavaHBaseBulkGetExample { + public static void main(String args[]) { + if (args.length == 0) { + System.out + .println("JavaHBaseBulkGetExample {master} {tableName}"); + } + + String master = args[0]; + String tableName = args[1]; + + JavaSparkContext jsc = new JavaSparkContext(master, + "JavaHBaseBulkGetExample"); + jsc.addJar("spark.jar"); + + List list = new ArrayList(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(), + new ResultFunction()); + } + + public static class GetFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Get call(byte[] v) throws Exception { + return new Get(v); + } + } + + public static class ResultFunction implements Function { + + private static final long serialVersionUID = 1L; + + public String call(Result result) throws Exception { + Iterator it = result.listCells().iterator(); + StringBuilder b = new StringBuilder(); + + b.append(Bytes.toString(result.getRow()) + ":"); + + while (it.hasNext()) { + Cell cell = it.next(); + String q = Bytes.toString(cell.getQualifierArray()); + if (q.equals("counter")) { + b.append("(" + Bytes.toString(cell.getQualifierArray()) + "," + + Bytes.toLong(cell.getValueArray()) + ")"); + } else { + b.append("(" + Bytes.toString(cell.getQualifierArray()) + "," + + Bytes.toString(cell.getValueArray()) + ")"); + } + } + return b.toString(); + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java new file mode 100644 index 0000000..dee5cf0 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +public class JavaHBaseBulkPutExample { + public static void main(String args[]) { + if (args.length == 0) { + System.out + .println("JavaHBaseBulkPutExample {master} {tableName} {columnFamily}"); + } + + String master = args[0]; + String tableName = args[1]; + String columnFamily = args[2]; + + JavaSparkContext jsc = new JavaSparkContext(master, + "JavaHBaseBulkPutExample"); + jsc.addJar("spark.jar"); + + List list = new ArrayList(); + list.add("1," + columnFamily + ",a,1"); + list.add("2," + columnFamily + ",a,2"); + list.add("3," + columnFamily + ",a,3"); + list.add("4," + columnFamily + ",a,4"); + list.add("5," + columnFamily + ",a,5"); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkPut(rdd, TableName.valueOf(tableName), new PutFunction(), true); + } + + public static class PutFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Put call(String v) throws Exception { + String[] cells = v.split(","); + Put put = new Put(Bytes.toBytes(cells[0])); + + put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]), + Bytes.toBytes(cells[3])); + return put; + } + + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java new file mode 100644 index 0000000..88159b2 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import scala.Tuple2; +import scala.Tuple3; + +public class JavaHBaseDistributedScan { + public static void main(String args[]) { + if (args.length == 0) { + System.out + .println("JavaHBaseDistributedScan {master} {tableName}"); + } + + String master = args[0]; + String tableName = args[1]; + + JavaSparkContext jsc = new JavaSparkContext(master, + "JavaHBaseDistributedScan"); + jsc.addJar("spark.jar"); + + + Configuration conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + Scan scan = new Scan(); + scan.setCaching(100); + + JavaRDD>>> javaRdd = + hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan); + + List>>> results = javaRdd.collect(); + + results.size(); + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java new file mode 100644 index 0000000..167590b --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; + +import scala.Tuple2; + +public class JavaHBaseMapGetPutExample { + public static void main(String args[]) { + if (args.length == 0) { + System.out + .println("JavaHBaseBulkGetExample {master} {tableName}"); + } + + String master = args[0]; + String tableName = args[1]; + + JavaSparkContext jsc = new JavaSparkContext(master, + "JavaHBaseBulkGetExample"); + jsc.addJar("spark.jar"); + + List list = new ArrayList(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + //All Spark + JavaRDD rdd = jsc.parallelize(list); + + //All HBase + Configuration conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + //This is me + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + //This is me + hbaseContext.foreachPartition(rdd, null); + + hbaseContext.foreach(rdd, new VoidFunction>() { + + + public void call(Tuple2 t) + throws Exception { + Table table1 = t._2().getTable(TableName.valueOf("Foo")); + + byte[] b = t._1(); + Result r = table1.get(new Get(b)); + if (r.getExists()) { + table1.put(new Put(b)); + } + + } + }); + } + + public static class GetFunction implements Function { + private static final long serialVersionUID = 1L; + public Get call(byte[] v) throws Exception { + return new Get(v); + } + } + + public static class CustomFunction implements VoidFunction, HConnection>> { + public void call(Tuple2, HConnection> t) throws Exception { + } + } +} diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java new file mode 100644 index 0000000..b88a644 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +public class JavaHBaseStreamingBulkPutExample { + public static void main(String args[]) { + if (args.length == 0) { + System.out + .println("JavaHBaseBulkPutExample {master} {host} {post} {tableName} {columnFamily}"); + } + + String master = args[0]; + String host = args[1]; + String port = args[2]; + String tableName = args[3]; + String columnFamily = args[4]; + + System.out.println("master:" + master); + System.out.println("host:" + host); + System.out.println("port:" + Integer.parseInt(port)); + System.out.println("tableName:" + tableName); + System.out.println("columnFamily:" + columnFamily); + + SparkConf sparkConf = new SparkConf(); + sparkConf.set("spark.cleaner.ttl", "120000"); + + JavaSparkContext jsc = new JavaSparkContext(master, + "JavaHBaseBulkPutExample"); + jsc.addJar("spark.jar"); + + JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(1000)); + + JavaReceiverInputDStream javaDstream = jssc.socketTextStream(host, Integer.parseInt(port)); + + Configuration conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.streamBulkPut(javaDstream, TableName.valueOf(tableName), new PutFunction(), true); + } + + public static class PutFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Put call(String v) throws Exception { + String[] cells = v.split(","); + Put put = new Put(Bytes.toBytes(cells[0])); + + put.add(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]), + Bytes.toBytes(cells[3])); + return put; + } + + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala new file mode 100644 index 0000000..780fc65 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.{CellUtil, TableName} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.ConnectionFactory +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.client.Get +import java.util.ArrayList +import org.apache.hadoop.hbase.client.Result +import scala.reflect.ClassTag +import org.apache.hadoop.hbase.client.Connection +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Increment +import org.apache.hadoop.hbase.client.Delete +import org.apache.spark.{Logging, SerializableWritable, SparkConf, SparkContext} +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.hbase.client.Mutation +import org.apache.spark.streaming.dstream.DStream +import java.io._ +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod +import org.apache.hadoop.hbase.mapreduce.TableInputFormat +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper +import org.apache.hadoop.fs.{Path, FileSystem} + +/** + * HBaseContext is a façade of simple and complex HBase operations + * like bulk put, get, increment, delete, and scan + * + * HBase Context will take the responsibilities to happen to + * complexity of disseminating the configuration information + * to the working and managing the life cycle of HConnections. + * + * serializable Configuration object + * + */ +class HBaseContext(@transient sc: SparkContext, + @transient config: Configuration, + val tmpHdfsConfgFile: String = null) extends Serializable with Logging { + + @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + @transient var tmpHdfsConfiguration:Configuration = config + @transient var appliedCredentials = false; + @transient val job = new Job(config) + TableMapReduceUtil.initCredentials(job) + val broadcastedConf = sc.broadcast(new SerializableWritable(config)) + val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials())) + + if (tmpHdfsConfgFile != null && config != null) { + val fs = FileSystem.newInstance(config) + val tmpPath = new Path(tmpHdfsConfgFile) + if (!fs.exists(tmpPath)) { + val outputStream = fs.create(tmpPath) + config.write(outputStream) + outputStream.close(); + } else { + logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!") + } + } + + /** + * A simple enrichment of the traditional Spark RDD foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](rdd: RDD[T], + f: (Iterator[T], Connection) => Unit) = { + rdd.foreachPartition( + it => hbaseForeachPartition(broadcastedConf, it, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + */ + def foreachRDD[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit) = { + dstream.foreach((rdd, time) => { + foreachPartition(rdd, f) + }) + } + + /** + * A simple enrichment of the traditional Spark RDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param rdd Original RDD with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartition[T, R: ClassTag](rdd: RDD[T], + mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = { + + rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf, + it, + mp), true) + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * foreachPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamForeachRDD[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit): Unit = { + + dstream.foreachRDD(rdd => ( + this.foreachPartition(rdd, f) + )) + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamMapPartition[T, U: ClassTag](dstream: DStream[T], + mp: (Iterator[T], Connection) => Iterator[U]): DStream[U] = { + + dstream.mapPartitions(it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + mp), true) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take RDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the RDD to a HBase Put + * @param autoFlush If autoFlush should be turned on + */ + def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put, autoFlush: Boolean) { + + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val m = connection.getBufferedMutator(tableName) + iterator.foreach(T => m.mutate(f(T))) + m.flush() + m.close() + })) + } + + def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){ + + + credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + + logInfo("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials); + + if (appliedCredentials == false && credentials != null) { + appliedCredentials = true + logCredInformation(credentials) + + @transient val ugi = UserGroupInformation.getCurrentUser(); + ugi.addCredentials(credentials) + // specify that this is a proxy user + ugi.setAuthenticationMethod(AuthenticationMethod.PROXY) + + ugi.addCredentials(credentialsConf.value.value) + } + } + + def logCredInformation[T] (credentials2:Credentials) { + val it = credentials2.getAllTokens.iterator(); + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a DStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the DStream to a HBase Put + * @param autoFlush If autoFlush should be turned on + */ + def streamBulkPut[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Put, + autoFlush: Boolean) = { + dstream.foreach((rdd, time) => { + bulkPut(rdd, tableName, f, autoFlush) + }) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a RDD and generate delete + * and send them to HBase. The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the RDD to a + * HBase Deletes + * @param batchSize The number of delete to batch before sending to HBase + */ + def bulkDelete[T](rdd: RDD[T], tableName: TableName, f: (T) => Delete, batchSize: Integer) { + bulkMutation(rdd, tableName, f, batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a DStream and + * generate Increments and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to increments into + * @param f Function to convert a value in the DStream to a + * HBase Increments + * @param batchSize The number of increments to batch before sending to HBase + */ + def streamBulkIncrement[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Increment, + batchSize: Int) = { + streamBulkMutation(dstream, tableName, f, batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a DStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f function to convert a value in the DStream to a + * HBase Delete + * @param batchSize The number of deletes to batch before sending to HBase + */ + def streamBulkDelete[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Delete, + batchSize: Integer) = { + streamBulkMutation(dstream, tableName, f, batchSize) + } + + /** + * Under lining function to support all bulk mutations + * + * May be opened up if requested + */ + private def bulkMutation[T](rdd: RDD[T], tableName: TableName, f: (T) => Mutation, batchSize: Integer) { + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val table = connection.getTable(tableName) + val mutationList = new ArrayList[Mutation] + iterator.foreach(T => { + mutationList.add(f(T)) + if (mutationList.size >= batchSize) { + table.batch(mutationList, null) + mutationList.clear() + } + }) + if (mutationList.size() > 0) { + table.batch(mutationList, null) + mutationList.clear() + } + table.close() + })) + } + + /** + * Under lining function to support all bulk streaming mutations + * + * May be opened up if requested + */ + private def streamBulkMutation[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Mutation, + batchSize: Integer) = { + dstream.foreach((rdd, time) => { + bulkMutation(rdd, tableName, f, batchSize) + }) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a RDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to get from + * @param makeGet function to convert a value in the RDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * RDD + * return new RDD that is created by the Get to HBase + */ + def bulkGet[T, U](tableName: TableName, + batchSize: Integer, + rdd: RDD[T], + makeGet: (T) => Get, + convertResult: (Result) => U): RDD[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + rdd.mapPartitions[U](it => + hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run), true)(fakeClassTag[U]) + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to get from + * @param makeGet function to convert a value in the DStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * DStream + * return new DStream that is created by the Get to HBase + */ + def streamBulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + dstream: DStream[T], + makeGet: (T) => Get, + convertResult: (Result) => U): DStream[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + dstream.mapPartitions[U](it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run), true) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new RDD + * + * @param tableName the name of the table to scan + * @param scan the HBase scan object to use to read data from HBase + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated RDD + * @return new RDD with results from scan + */ + def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan, f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { + + var job: Job = new Job(getConf(broadcastedConf)) + + TableMapReduceUtil.initCredentials(job) + TableMapReduceUtil.initTableMapperJob(tableName, scan, classOf[IdentityTableMapper], null, null, job) + + sc.newAPIHadoopRDD(job.getConfiguration(), + classOf[TableInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result]).map(f) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that predefines the + * type of the outputing RDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @return New RDD with results from scan + * + */ + def hbaseRDD(tableName: TableName, scans: Scan): + RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = { + + hbaseRDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])]( + tableName, + scans, + (r: (ImmutableBytesWritable, Result)) => { + val it = r._2.listCells().iterator() + val list = new ArrayList[(Array[Byte], Array[Byte], Array[Byte])]() + + while (it.hasNext()) { + val cell = it.next() + list.add((CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell))) + } + + (r._1.copyBytes(), list) + }) + } + + /** + * Under lining wrapper all foreach functions in HBaseContext + * + */ + private def hbaseForeachPartition[T]( + configBroadcast: Broadcast[SerializableWritable[Configuration]], + it: Iterator[T], + f: (Iterator[T], Connection) => Unit) = { + + val config = getConf(configBroadcast) + + applyCreds(configBroadcast) + // specify that this is a proxy user + val connection = ConnectionFactory.createConnection(config) + f(it, connection) + connection.close() + } + + private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = { + + if (tmpHdfsConfiguration != null) { + tmpHdfsConfiguration + } else if (tmpHdfsConfgFile != null) { + + val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf) + val inputStream = fs.open(new Path(tmpHdfsConfgFile)) + tmpHdfsConfiguration = new Configuration(false) + tmpHdfsConfiguration.readFields(inputStream) + inputStream.close() + + tmpHdfsConfiguration + } + + if (tmpHdfsConfiguration == null) { + try { + tmpHdfsConfiguration = configBroadcast.value.value + tmpHdfsConfiguration + } catch { + case ex: Exception =>{ + println("Unable to getConfig from broadcast") + } + } + } + tmpHdfsConfiguration + } + + /** + * Under lining wrapper all mapPartition functions in HBaseContext + * + */ + private def hbaseMapPartition[K, U]( + configBroadcast: Broadcast[SerializableWritable[Configuration]], + it: Iterator[K], + mp: (Iterator[K], Connection) => Iterator[U]): Iterator[U] = { + + val config = getConf(configBroadcast) + applyCreds(configBroadcast) + val hConnection = ConnectionFactory.createConnection(config) + + val res = mp(it, hConnection) + hConnection.close() + res + } + + /** + * Under lining wrapper all get mapPartition functions in HBaseContext + */ + private class GetMapPartition[T, U](tableName: TableName, + batchSize: Integer, + makeGet: (T) => Get, + convertResult: (Result) => U) extends Serializable { + + def run(iterator: Iterator[T], connection: Connection): Iterator[U] = { + val table = connection.getTable(tableName) + + val gets = new ArrayList[Get]() + var res = List[U]() + + while (iterator.hasNext) { + gets.add(makeGet(iterator.next)) + + if (gets.size() == batchSize) { + var results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + } + if (gets.size() > 0) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + table.close() + res.iterator + } + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as the Java compiler + * cannot produce them automatically. While this ClassTag-faking does please the compiler, + * it can cause problems at runtime if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, just worse performance + * or security issues. For instance, an Array[AnyRef] can hold any type T, but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala new file mode 100644 index 0000000..790c512 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala @@ -0,0 +1,28 @@ +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +object HBaseDStreamFunctions { + implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) { + def hbaseBulkPut(hc: HBaseContext, tableName: TableName, f: (T) => Put, autoFlush: Boolean): Unit = { + hc.streamBulkPut(dStream, tableName, f, autoFlush) + } + def hbaseBulkGet(hc: HBaseContext, tableName: TableName, batchSize:Int, f: (T) => Get, convertResult: (Result) => Unit): Unit = { + hc.streamBulkGet(tableName, batchSize, dStream, f, convertResult) + } + def hbaseBulkDelete(hc: HBaseContext, tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = { + hc.streamBulkDelete(dStream, tableName, f, batchSize) + } + def hbaseForeachRDD(hc: HBaseContext, f: (Iterator[T], Connection) => Unit): Unit = { + hc.streamForeachRDD(dStream, f) + } + def hbaseMapPartition[R: ClassTag](hc: HBaseContext, f: (Iterator[T], Connection) => Iterator[R]): DStream[R] = { + hc.streamMapPartition(dStream, f) + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala new file mode 100644 index 0000000..7445a16 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +object HBaseRDDFunctions +{ + implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) { + def hbaseBulkPut(hc: HBaseContext, tableName: TableName, f: (T) => Put, autoFlush: Boolean): Unit = { + hc.bulkPut(rdd, tableName, f, autoFlush) + } + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get, convertResult: (Result) => R): RDD[R] = { + hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult) + } + def hbaseBulkDelete(hc: HBaseContext, tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = { + hc.bulkDelete(rdd, tableName, f, batchSize) + } + def hbaseForeachPartition(hc: HBaseContext, f: (Iterator[T], Connection) => Unit): Unit = { + hc.foreachPartition(rdd, f) + } + def hbaseMapPartition[R: ClassTag](hc: HBaseContext, f: (Iterator[T], Connection) => Iterator[R]): RDD[R] = { + hc.mapPartition[T,R](rdd, f) + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala new file mode 100644 index 0000000..d265a06 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.spark.api.java.JavaSparkContext +import org.apache.hadoop.conf.Configuration +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.function.VoidFunction +import org.apache.spark.api.java.function.Function +import org.apache.hadoop.hbase.client.Connection +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.api.java.function.FlatMapFunction +import scala.collection.JavaConversions._ +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Increment +import org.apache.hadoop.hbase.client.Delete +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import scala.reflect.ClassTag + +class JavaHBaseContext(@transient jsc: JavaSparkContext, + @transient config: Configuration) extends Serializable { + val hbc = new HBaseContext(jsc.sc, config) + + /** + * A simple enrichment of the traditional Spark javaRdd foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param javaRdd Original javaRdd with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](javaRdd: JavaRDD[T], + f: VoidFunction[(java.util.Iterator[T], Connection)] ) = { + + hbc.foreachPartition(javaRdd.rdd, + (iterator:Iterator[T], hConnection) => + { f.call((iterator, hConnection))}) + } + + def foreach[T](javaRdd: JavaRDD[T], + f: VoidFunction[(T, Connection)] ) = { + + hbc.foreachPartition(javaRdd.rdd, + (iterator:Iterator[T], hConnection) => + iterator.foreach(a => f.call((a, hConnection)))) + + //{ f.call((iterator, hConnection))}) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param javaDstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the JavaDStream values and a HConnection object to + * interact with HBase + */ + def foreachRDD[T](javaDstream: JavaDStream[T], + f: VoidFunction[(Iterator[T], Connection)]) = { + hbc.foreachRDD(javaDstream.dstream, (it:Iterator[T], hc: Connection) => f.call(it, hc)) + } + + /** + * A simple enrichment of the traditional Spark JavaRDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param javaRdd Original JavaRdd with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartition[T,R](javaRdd: JavaRDD[T], + mp: FlatMapFunction[(java.util.Iterator[T], Connection),R] ): JavaRDD[R] = { + + def fn = (x: Iterator[T], connection: Connection) => + asScalaIterator( + mp.call((asJavaIterator(x), connection)).iterator() + ) + + JavaRDD.fromRDD(hbc.mapPartition(javaRdd.rdd, + (iterator:Iterator[T], connection:Connection) => + fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R]) + } + + /** + * A simple enrichment of the traditional Spark Streaming JavaDStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param javaDstream Original JavaDStream with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the JavaDStream values and a HConnection object to + * interact with HBase + * @return Returns a new JavaDStream generated by the user + * definition function just like normal mapPartition + */ + def streamMap[T, U](javaDstream: JavaDStream[T], + mp: Function[(Iterator[T], Connection), Iterator[U]]): JavaDStream[U] = { + JavaDStream.fromDStream(hbc.streamMapPartition(javaDstream.dstream, + (it: Iterator[T], hc: Connection) => + mp.call(it, hc) )(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take JavaRDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDdd Original JavaRDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the JavaRDD + * to a HBase Put + * @param autoFlush If autoFlush should be turned on + */ + def bulkPut[T](javaDdd: JavaRDD[T], + tableName: TableName, + f: Function[(T), Put], + autoFlush: Boolean) { + + hbc.bulkPut(javaDdd.rdd, tableName, (t:T) => f.call(t), autoFlush) + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a JavaDStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the JavaDStream to a HBase Put + * @param autoFlush If autoFlush should be turned on + */ + def streamBulkPut[T](javaDstream: JavaDStream[T], + tableName: TableName, + f: Function[T,Put], + autoFlush: Boolean) = { + hbc.streamBulkPut(javaDstream.dstream, + tableName, + (t:T) => f.call(t), + autoFlush) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a JavaRDD and + * generate delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaRdd Original JavaRDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the JavaRDD to a + * HBase Deletes + * @param batchSize The number of deletes to batch before sending to HBase + */ + def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName, + f: Function[T, Delete], batchSize:Integer) { + hbc.bulkDelete(javaRdd.rdd, tableName, (t:T) => f.call(t), batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a DStream and + * generate Increments and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDstream Original JavaDStream with data to iterate over + * @param tableName The name of the table to increments into + * @param f Function to convert a value in the JavaDStream to a + * HBase Increments + * @param batchSize The number of increments to batch before sending to HBase + */ + def streamBulkIncrement[T](javaDstream: JavaDStream[T], + tableName: TableName, + f: Function[T, Increment], + batchSize: Integer) = { + hbc.streamBulkIncrement(javaDstream.dstream, tableName, + (t:T) => f.call(t), + batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a JavaDStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDstream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the JavaDStream to a + * HBase Delete + * @param batchSize The number of deletes to be sent at once + */ + def streamBulkDelete[T](javaDstream: JavaDStream[T], + tableName: TableName, + f: Function[T, Delete], + batchSize: Integer) = { + hbc.streamBulkDelete(javaDstream.dstream, tableName, + (t:T) => f.call(t), + batchSize) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a JavaRDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param tableName The name of the table to get from + * @param batchSize batch size of how many gets to retrieve in a single fetch + * @param javaRdd Original JavaRDD with data to iterate over + * @param makeGet Function to convert a value in the JavaRDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * JavaRDD + * return new JavaRDD that is created by the Get to HBase + */ + def bulkGet[T, U](tableName: TableName, + batchSize:Integer, + javaRdd: JavaRDD[T], + makeGet: Function[T, Get], + convertResult: Function[Result, U]): JavaRDD[U] = { + JavaRDD.fromRDD(hbc.bulkGet(tableName, + batchSize, + javaRdd.rdd, + (t:T) => makeGet.call(t), + (r:Result) => {convertResult.call(r)}))(fakeClassTag[U]) + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + + * @param tableName The name of the table to get from + * @param batchSize The number of gets to be batched together + * @param javaDStream Original DStream with data to iterate over + * @param makeGet Function to convert a value in the JavaDStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * JavaDStream + * return new JavaDStream that is created by the Get to HBase + */ + def streamBulkGet[T, U](tableName:TableName, + batchSize:Integer, + javaDStream: JavaDStream[T], + makeGet: Function[T, Get], + convertResult: Function[Result, U]) { + JavaDStream.fromDStream(hbc.streamBulkGet(tableName, + batchSize, + javaDStream.dstream, + (t:T) => makeGet.call(t), + (r:Result) => convertResult.call(r) )(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new JavaRDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated JavaRDD + * @return new JavaRDD with results from scan + */ + def hbaseRDD[U](tableName: TableName, + scans: Scan, + f: Function[(ImmutableBytesWritable, Result), U]): + JavaRDD[U] = { + JavaRDD.fromRDD( + hbc.hbaseRDD[U](tableName, + scans, + (v:(ImmutableBytesWritable, Result)) => + f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that predefines the + * type of the outputing JavaRDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @return New JavaRDD with results from scan + * + */ + def hbaseRDD(tableName: TableName, + scans: Scan): JavaRDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = { + JavaRDD.fromRDD(hbc.hbaseRDD(tableName, scans)) + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as the Java compiler + * cannot produce them automatically. While this ClassTag-faking does please the compiler, + * it can cause problems at runtime if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, just worse performance + * or security issues. For instance, an Array[AnyRef] can hold any type T, but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala new file mode 100644 index 0000000..9861a4a --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Delete +import org.apache.spark.SparkConf + +object HBaseBulkDeleteExample { + def main(args: Array[String]) { + if (args.length == 0) { + println("HBaseBulkDeletesExample {tableName} "); + return; + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName) + val sc = new SparkContext(sparkConf) + try { + //[Array[Byte]] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1")), + (Bytes.toBytes("2")), + (Bytes.toBytes("3")), + (Bytes.toBytes("4")), + (Bytes.toBytes("5")) + ) + ) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf) + hbaseContext.bulkDelete[Array[Byte]](rdd, + TableName.valueOf(tableName), + putRecord => new Delete(putRecord), + 4) + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala new file mode 100644 index 0000000..8cca0de --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Result +import org.apache.spark.SparkConf + +object HBaseBulkGetExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkGetExample {tableName}"); + return ; + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName) + val sc = new SparkContext(sparkConf) + + try { + + //[(Array[Byte])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1")), + (Bytes.toBytes("2")), + (Bytes.toBytes("3")), + (Bytes.toBytes("4")), + (Bytes.toBytes("5")), + (Bytes.toBytes("6")), + (Bytes.toBytes("7")))) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf); + + val getRdd = hbaseContext.bulkGet[Array[Byte], String]( + TableName.valueOf(tableName), + 2, + rdd, + record => { + System.out.println("making Get") + new Get(record) + }, + (result: Result) => { + + val it = result.listCells().iterator() + val b = new StringBuilder + + b.append(Bytes.toString(result.getRow()) + ":") + + while (it.hasNext()) { + val cell = it.next() + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + b.toString + }) + + getRdd.collect.foreach(v => println(v)) + + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala new file mode 100644 index 0000000..8f66306 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.SparkConf + +object HBaseBulkPutExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkPutExample {tableName} {columnFamily}"); + return; + } + + val tableName = args(0) + val columnFamily = args(1) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + + try { + //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) + )) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf); + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }, + true); + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala new file mode 100644 index 0000000..1efa03f --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Text +import org.apache.spark.SparkConf + +object HBaseBulkPutExampleFromFile { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkPutExampleFromFile {tableName} {columnFamily} {inputFile}"); + return; + } + + val tableName = args(0) + val columnFamily = args(1) + val inputFile = args(2) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutExampleFromFile " + + tableName + " " + columnFamily + " " + inputFile) + val sc = new SparkContext(sparkConf) + + try { + var rdd = sc.hadoopFile( + inputFile, + classOf[TextInputFormat], + classOf[LongWritable], + classOf[Text]).map(v => { + System.out.println("reading-" + v._2.toString()) + v._2.toString() + }) + + val conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hdfs-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + val hbaseContext = new HBaseContext(sc, conf); + hbaseContext.bulkPut[String](rdd, + TableName.valueOf(tableName), + (putRecord) => { + System.out.println("hbase-" + putRecord) + val put = new Put(Bytes.toBytes("Value- " + putRecord)) + put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes(putRecord.length())) + put + }, + true); + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala new file mode 100644 index 0000000..649d176 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.SparkConf + +object HBaseBulkPutTimestampExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily}"); + return ; + } + + val tableName = args(0); + val columnFamily = args(1); + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + + try { + + val rdd = sc.parallelize(Array( + (Bytes.toBytes("6"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("7"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("8"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("9"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("10"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))))); + + val conf = HBaseConfiguration.create(); + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); + + val timeStamp = System.currentTimeMillis() + + val hbaseContext = new HBaseContext(sc, conf); + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, timeStamp, putValue._3)) + put + }, + true); + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala new file mode 100644 index 0000000..7eed504 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Scan +import org.apache.spark.SparkConf + +object HBaseDistributedScanExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("GenerateGraphs {tableName}") + return + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName ) + val sc = new SparkContext(sparkConf) + + try { + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf) + + var scan = new Scan() + scan.setCaching(100) + + var getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) + + getRdd.foreach(v => println(Bytes.toString(v._1))) + + getRdd.collect.foreach(v => println(Bytes.toString(v._1))) + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala new file mode 100644 index 0000000..144bdfb --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.Seconds +import org.apache.spark.SparkConf + +object HBaseStreamingBulkPutExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseStreamingBulkPutExample {host} {port} {tableName} {columnFamily}"); + return + } + + val host = args(0) + val port = args(1) + val tableName = args(2) + val columnFamily = args(3) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " + tableName + " " + columnFamily) + sparkConf.set("spark.cleaner.ttl", "120000") + val sc = new SparkContext(sparkConf) + try { + val ssc = new StreamingContext(sc, Seconds(1)) + + val lines = ssc.socketTextStream(host, port.toInt) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf); + + hbaseContext.streamBulkPut[String](lines, + TableName.valueOf(tableName), + (putRecord) => { + if (putRecord.length() > 0) { + val put = new Put(Bytes.toBytes(putRecord)) + put.add(Bytes.toBytes("c"), Bytes.toBytes("foo"), Bytes.toBytes("bar")) + put + } else { + null + } + }, + false) + ssc.start() + ssc.awaitTerminationOrTimeout(60000) + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala new file mode 100644 index 0000000..150bc23 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.client.Delete +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.util.Bytes + +import org.apache.spark.{SparkContext, SparkConf} + +object HBaseBulkDeleteExample { + def main(args: Array[String]) { + if (args.length == 0) { + println("HBaseBulkDeletesExample {tableName} "); + return; + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName) + val sc = new SparkContext(sparkConf) + try { + //[Array[Byte]] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1")), + (Bytes.toBytes("2")), + (Bytes.toBytes("3")), + (Bytes.toBytes("4")), + (Bytes.toBytes("5")) + )) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf) + + rdd.hbaseBulkDelete(hbaseContext, TableName.valueOf(tableName), + putRecord => new Delete(putRecord), + 4) + + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala new file mode 100644 index 0000000..c7f7c27 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.client.{Result, Get} +import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.spark.{SparkContext, SparkConf} + +object HBaseBulkGetExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkGetExample {tableName}"); + return ; + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName) + val sc = new SparkContext(sparkConf) + + try { + + //[(Array[Byte])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1")), + (Bytes.toBytes("2")), + (Bytes.toBytes("3")), + (Bytes.toBytes("4")), + (Bytes.toBytes("5")), + (Bytes.toBytes("6")), + (Bytes.toBytes("7")))) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf); + + val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2, + record => { + System.out.println("making Get") + new Get(record) + }, + (result: Result) => { + + val it = result.listCells().iterator() + val b = new StringBuilder + + b.append(Bytes.toString(result.getRow()) + ":") + + while (it.hasNext()) { + val cell = it.next() + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + b.toString + }) + + getRdd.collect.foreach(v => println(v)) + + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala new file mode 100644 index 0000000..a6ad522 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} +import org.apache.spark.{SparkConf, SparkContext} + +object HBaseBulkPutExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkPutExample {tableName} {columnFamily}"); + return; + } + + val tableName = args(0) + val columnFamily = args(1) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + + try { + //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) + )) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf); + + rdd.hbaseBulkPut(hbaseContext, TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }, + true) + + } finally { + sc.stop() + } + } + } \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala new file mode 100644 index 0000000..3c9669d --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.{SparkContext, SparkConf} + +object HBaseForeachPartitionExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkPutExample {tableName} {columnFamily}"); + return; + } + + val tableName = args(0) + val columnFamily = args(1) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + + try { + //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) + )) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf) + + + rdd.hbaseForeachPartition(hbaseContext, + (it, connection) => { + val m = connection.getBufferedMutator(TableName.valueOf(tableName)) + + it.foreach(r => { + val put = new Put(r._1) + r._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + m.mutate(put) + }) + m.flush() + m.close() + }) + + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala new file mode 100644 index 0000000..90b46cb --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.client.{Result, Get} +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.{SparkContext, SparkConf} + +object HBaseMapPartitionExample { + def main(args: Array[String]) { + if (args.length == 0) { + System.out.println("HBaseBulkGetExample {tableName}"); + return ; + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName) + val sc = new SparkContext(sparkConf) + + try { + + //[(Array[Byte])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1")), + (Bytes.toBytes("2")), + (Bytes.toBytes("3")), + (Bytes.toBytes("4")), + (Bytes.toBytes("5")), + (Bytes.toBytes("6")), + (Bytes.toBytes("7")))) + + val conf = HBaseConfiguration.create() + conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) + conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) + + val hbaseContext = new HBaseContext(sc, conf); + + val getRdd = rdd.hbaseMapPartition[String](hbaseContext, (it, connection) => { + val table = connection.getTable(TableName.valueOf(tableName)) + it.map{r => + //batching would be faster. This is just an example + val result = table.get(new Get(r)) + + val it = result.listCells().iterator() + val b = new StringBuilder + + b.append(Bytes.toString(result.getRow()) + ":") + + while (it.hasNext()) { + val cell = it.next() + val q = Bytes.toString(cell.getQualifierArray) + if (q.equals("counter")) { + b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")") + } else { + b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")") + } + } + b.toString + } + }) + + getRdd.collect.foreach(v => println(v)) + + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaLocal.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaLocal.java new file mode 100644 index 0000000..3e859d3 --- /dev/null +++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaLocal.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.VoidFunction; +import org.junit.After; + +import scala.Tuple2; + +import com.google.common.io.Files; + +public class TestJavaLocal { + + private static transient JavaSparkContext jsc; + private static transient File tempDir; + static HBaseTestingUtility htu; + + static String tableName = "t1"; + static String columnFamily = "c"; + + public static void main(String[] agrs) { + setUp(); + + Configuration conf = htu.getConfiguration(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + List list = new ArrayList(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + + + hbaseContext.foreachPartition(rdd, new VoidFunction, Connection>>() { + + + public void call(Tuple2, Connection> t) + throws Exception { + Table table1 = t._2().getTable(TableName.valueOf("Foo")); + + Iterator it = t._1(); + + while (it.hasNext()) { + byte[] b = it.next(); + Result r = table1.get(new Get(b)); + if (r.getExists()) { + table1.put(new Put(b)); + } + } + } + }); + + //This is me + hbaseContext.foreach(rdd, new VoidFunction>() { + + public void call(Tuple2 t) + throws Exception { + Table table1 = t._2().getTable(TableName.valueOf("Foo")); + + byte[] b = t._1(); + Result r = table1.get(new Get(b)); + if (r.getExists()) { + table1.put(new Put(b)); + } + } + }); + tearDown(); + } + + public static void setUp() { + jsc = new JavaSparkContext("local", "JavaHBaseContextSuite"); + jsc.addJar("spark.jar"); + + tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + + htu = HBaseTestingUtility.createLocalHTU(); + try { + System.out.println("cleaning up test dir"); + + htu.cleanupTestDir(); + + System.out.println("starting minicluster"); + + htu.startMiniZKCluster(); + htu.startMiniHBaseCluster(1, 1); + + System.out.println(" - minicluster started"); + + try { + htu.deleteTable(Bytes.toBytes(tableName)); + } catch (Exception e) { + System.out.println(" - no table " + tableName + " found"); + } + + System.out.println(" - creating table " + tableName); + htu.createTable(Bytes.toBytes(tableName), Bytes.toBytes(columnFamily)); + System.out.println(" - created table"); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } + + @After + public static void tearDown() { + try { + htu.deleteTable(Bytes.toBytes(tableName)); + System.out.println("shuting down minicluster"); + htu.shutdownMiniHBaseCluster(); + htu.shutdownMiniZKCluster(); + System.out.println(" - minicluster shut down"); + htu.cleanupTestDir(); + } catch (Exception e) { + throw new RuntimeException(e); + } + jsc.stop(); + jsc = null; + } +} \ No newline at end of file diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala new file mode 100644 index 0000000..69455ba --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import org.scalatest.FunSuite +import org.apache.spark._ +import org.apache.hadoop.hbase.{CellUtil, Cell, TableName, HBaseTestingUtility} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.ConnectionFactory +import org.apache.hadoop.hbase.client.Increment +import org.apache.hadoop.hbase.client.Delete +import org.apache.hadoop.hbase.client.Result + +class HBaseContextSuite extends FunSuite with LocalSparkContext { + + var htu: HBaseTestingUtility = null + + val tableName = "t1" + val columnFamily = "c" + + override def beforeAll() { + htu = HBaseTestingUtility.createLocalHTU() + + htu.cleanupTestDir() + println("starting minicluster") + htu.startMiniZKCluster(); + htu.startMiniHBaseCluster(1, 1); + println(" - minicluster started") + try { + htu.deleteTable(TableName.valueOf(tableName)) + } catch { + case e: Exception => { + println(" - no table " + tableName + " found") + } + } + println(" - creating table " + tableName) + htu.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)) + println(" - created table") + + + sc = new SparkContext("local", "test") + } + + override def afterAll() { + htu.deleteTable(TableName.valueOf(tableName)) + println("shuting down minicluster") + htu.shutdownMiniHBaseCluster() + htu.shutdownMiniZKCluster() + println(" - minicluster shut down") + htu.cleanupTestDir() + + sc.stop() + } + + test("bulkput to test HBase client") { + val config = htu.getConfiguration + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))), + (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))), + (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) + + val hbaseContext = new HBaseContext(sc, config); + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }, + true); + + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))) + assert(foo1.equals("foo1"), foo1 + "!=foo1") + + assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b")))).equals("foo2")) + + assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c")))).equals("foo3")) + + assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d")))).equals("foo")) + + assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e")))).equals("bar")) + + } + + test("bulkDelete to test HBase client") { + val config = htu.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + var put = new Put(Bytes.toBytes("delete1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("delete2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("delete3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + sc = new SparkContext("local", "test") + val rdd = sc.parallelize(Array( + (Bytes.toBytes("delete1")), + (Bytes.toBytes("delete3")))) + + val hbaseContext = new HBaseContext(sc, config); + hbaseContext.bulkDelete[Array[Byte]](rdd, + TableName.valueOf(tableName), + putRecord => new Delete(putRecord), + 4); + + assert(table.get(new Get(Bytes.toBytes("delete1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(table.get(new Get(Bytes.toBytes("delete3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2")) + + } + + test("bulkGet to test HBase client") { + val config = htu.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + sc = new SparkContext("local", "test") + val rdd = sc.parallelize(Array( + (Bytes.toBytes("get1")), + (Bytes.toBytes("get2")), + (Bytes.toBytes("get3")), + (Bytes.toBytes("get4")))) + val hbaseContext = new HBaseContext(sc, config); + + val getRdd = hbaseContext.bulkGet[Array[Byte], Object]( + TableName.valueOf(tableName), + 2, + rdd, + record => { + new Get(record) + }, + (result: Result) => { + if (result.listCells() != null) { + val it = result.listCells().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(result.getRow()) + ":") + + while (it.hasNext()) { + val cell = it.next() + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + "" + B.toString + } else { + "" + } + }) + + val getArray = getRdd.collect + + getArray.foreach(f => println(f)); + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + + } + + test("distributedScan to test HBase client") { + val config = htu.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + var put = new Put(Bytes.toBytes("scan1")) + + + + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("scan2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("scan3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + put = new Put(Bytes.toBytes("scan4")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + put = new Put(Bytes.toBytes("scan5")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + + + sc = new SparkContext("local", "test") + val hbaseContext = new HBaseContext(sc, config) + + var scan = new Scan() + scan.setCaching(100) + scan.setStartRow(Bytes.toBytes("scan2")) + scan.setStopRow(Bytes.toBytes("scan4_")) + + val scanRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) + + val scanList = scanRdd.collect + + assert(scanList.length == 3) + } + +} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/LocalSparkContext.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/LocalSparkContext.scala new file mode 100644 index 0000000..8d4861b --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/LocalSparkContext.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import _root_.io.netty.util.internal.logging.{Slf4JLoggerFactory, InternalLoggerFactory} +import org.apache.spark.SparkContext +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Suite + +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ +trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => + + @transient var sc: SparkContext = _ + + override def beforeAll() { + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()) + super.beforeAll() + } + + override def afterEach() { + resetSparkContext() + super.afterEach() + } + + def resetSparkContext(): Unit = { + LocalSparkContext.stop(sc) + sc = null + } + +} + +object LocalSparkContext { + def stop(sc: SparkContext) { + if (sc != null) { + sc.stop() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = { + try { + f(sc) + } finally { + stop(sc) + } + } + +} diff --git a/pom.xml b/pom.xml index 5da039f..421ed6f 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ hbase-rest hbase-checkstyle hbase-shaded + hbase-spark