From e7f659cac4498bdb94fd0e2b9e4339a06eaba9c1 Mon Sep 17 00:00:00 2001 From: ted malaska Date: Mon, 12 Oct 2015 11:23:16 -0400 Subject: [PATCH] HBASE-14158 Add documentation for Initial Release for HBase-Spark Module integration Signed-off-by: Misty Stanley-Jones --- src/main/asciidoc/_chapters/spark.adoc | 451 +++++++++++++++++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 src/main/asciidoc/_chapters/spark.adoc diff --git a/src/main/asciidoc/_chapters/spark.adoc b/src/main/asciidoc/_chapters/spark.adoc new file mode 100644 index 0000000..9b5179b --- /dev/null +++ b/src/main/asciidoc/_chapters/spark.adoc @@ -0,0 +1,451 @@ +//// +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + . . http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +//// + +[[spark]] += HBase and Spark +:doctype: book +:numbered: +:toc: left +:icons: font +:experimental: + +link:http://spark.apache.org/[Apache Spark] is a software framework that is used +to process data in memory in a distributed manner, and is replacing MapReduce in +many use cases. + +Spark itself is out of scope of this document, please refer to the Spark site for +more information on the Spark project and subprojects. This document will focus +on 4 main interaction points between Spark and HBase. Those interaction points are: + +Basic Spark:: + The ability to have a HBase Connection at any point in your Spark DAG. +Spark Streaming:: + The ability to have a HBase Connection at any point in your Spark Streaming + application. +Spark Bulk Load:: + The ability to write directly to HBase HFiles for bulk insertion into HBase +SparkSQL/DataFrames:: + The ability to write SparkSQL that draws on tables that are represented in HBase. + +The following sections will walk through examples of all these interaction points. + +== Basic Spark + +This section discusses Spark HBase integration at the lowest and simplest levels. +All the other interaction points are built upon the concepts that will be described +here. + +At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext +takes in HBase configurations and pushes them to the Spark executors. This allows +us to have an HBase Connection per Spark Executor in a static location. + +For reference, Spark Executors can be on the same nodes as the Region Servers or +on different nodes there is no dependence of co-location. Think of every Spark +Executor as a multi-threaded client application. This allows any Spark Tasks +running on the executors to access the shared Connection object. + +.HBaseContext Usage Example +==== + +This example shows how HBaseContext can be used to do a `foreachPartition` on a RDD +in Scala: + +[source, scala] +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +... + +val hbaseContext = new HBaseContext(sc, config) + +rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { + val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) + it.foreach((putRecord) => { +. val put = new Put(putRecord._1) +. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) +. bufferedMutator.mutate(put) + }) + bufferedMutator.flush() + bufferedMutator.close() +}) +---- + +Here is the same example implemented in Java: + +[source, java] +---- +JavaSparkContext jsc = new JavaSparkContext(sparkConf); + +try { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + ... + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.foreachPartition(rdd, + new VoidFunction, Connection>>() { + public void call(Tuple2, Connection> t) + throws Exception { + Table table = t._2().getTable(TableName.valueOf(tableName)); + BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); + while (t._1().hasNext()) { + byte[] b = t._1().next(); + Result r = table.get(new Get(b)); + if (r.getExists()) { + mutator.mutate(new Put(b)); + } + } + + mutator.flush(); + mutator.close(); + table.close(); + } + }); +} finally { + jsc.stop(); +} +---- +==== + +All functionality between Spark and HBase will be supported both in Scala and in +Java, with the exception of SparkSQL which will support any language that is +supported by Spark. For the remaining of this documentation we will focus on +Scala examples for now. + +The examples above illustrate how to do a foreachPartition with a connection. A +number of other Spark base functions are supported out of the box: + +// tag::spark_base_functions[] +`bulkPut`:: For massively parallel sending of puts to HBase +`bulkDelete`:: For massively parallel sending of deletes to HBase +`bulkGet`:: For massively parallel sending of gets to HBase to create a new RDD +`mapPartition`:: To do a Spark Map function with a Connection object to allow full +access to HBase +`hBaseRDD`:: To simplify a distributed scan to create a RDD +// end::spark_base_functions[] + +For examples of all these functionalities, see the HBase-Spark Module. + +== Spark Streaming +http://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream +processing framework built on top of Spark. HBase and Spark Streaming make great +companions in that HBase can help serve the following benefits alongside Spark +Streaming. + +* A place to grab reference data or profile data on the fly +* A place to store counts or aggregates in a way that supports Spark Streaming +promise of _only once processing_. + +The HBase-Spark module’s integration points with Spark Streaming are similar to +its normal Spark integration points, in that the following commands are possible +straight off a Spark Streaming DStream. + +include::spark.adoc[tags=spark_base_functions] + +.`bulkPut` Example with DStreams +==== + +Below is an example of bulkPut with DStreams. It is very close in feel to the RDD +bulk put. + +[source, scala] +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) +val ssc = new StreamingContext(sc, Milliseconds(200)) + +val rdd1 = ... +val rdd2 = ... + +val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte], + Array[Byte], Array[Byte])])]]() + +queue += rdd1 +queue += rdd2 + +val dStream = ssc.queueStream(queue) + +dStream.hbaseBulkPut( + hbaseContext, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }) +---- + +There are three inputs to the `hbaseBulkPut` function. +. The hbaseContext that carries the configuration boardcast information link us +to the HBase Connections in the executors +. The table name of the table we are putting data into +. A function that will convert a record in the DStream into a HBase Put object. +==== + +== Bulk Load + +Spark bulk load follows very closely to the MapReduce implementation of bulk +load. In short, a partitioner partitions based on region splits and +the row keys are sent to the reducers in order, so that HFiles can be written +out. In Spark terms, the bulk load will be focused around a +`repartitionAndSortWithinPartitions` followed by a `foreachPartition`. + +The only major difference with the Spark implementation compared to the +MapReduce implementation is that the column qualifier is included in the shuffle +ordering process. This was done because the MapReduce bulk load implementation +would have memory issues with loading rows with a large numbers of columns, as a +result of the sorting of those columns being done in the memory of the reducer JVM. +Instead, that ordering is done in the Spark Shuffle, so there should no longer +be a limit to the number of columns in a row for bulk loading. + +.Bulk Loading Example +==== + +The following example shows bulk loading with Spark. + +[source, scala] +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) + +val stagingFolder = ... + +rdd.hbaseBulkLoad(TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath) + +val load = new LoadIncrementalHFiles(config) +load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) +---- +==== + +The `hbaseBulkLoad` function takes three required parameters: + +. The table name of the table we intend to bulk load too + +. A function that will convert a record in the RDD to a tuple key value par. With +the tuple key being a KeyFamilyQualifer object and the value being the cell value. +The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier. +The shuffle will partition on the RowKey but will sort by all three values. + +. The temporary path for the HFile to be written out too + +Following the Spark bulk load command, use the HBase's LoadIncrementalHFiles object +to load the newly created HFiles into HBase. + +.Additional Parameters for Bulk Loading with Spark + +You can set the following attributes with additional parameter options on hbaseBulkLoad. + +* Max file size of the HFiles +* A flag to exclude HFiles from compactions +* Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding + +.Using Additional Parameters +==== + +[source, scala] +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) + +val stagingFolder = ... + +val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] +val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX") + +familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options) + +rdd.hbaseBulkLoad(TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath, + familyHBaseWriterOptions, + compactionExclude = false, + HConstants.DEFAULT_MAX_FILE_SIZE) + +val load = new LoadIncrementalHFiles(config) +load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) +---- +==== + +== SparkSQL/DataFrames + +http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports +SQL that will compute down to a Spark DAG. In addition,SparkSQL is a heavy user +of DataFrames. DataFrames are like RDDs with schema information. + +The HBase-Spark module includes support for Spark SQL and DataFrames, which allows +you to write SparkSQL directly on HBase tables. In addition the HBase-Spark +will push down query filtering logic to HBase. + +=== Predicate Push Down + +There are two examples of predicate push down in the HBase-Spark implementation. +The first example shows the push down of filtering logic on the RowKey. HBase-Spark +will reduce the filters on RowKeys down to a set of Get and/or Scan commands. + +NOTE: The Scans are distributed scans, rather than a single client scan operation. + +If the query looks something like the following, the logic will push down and get +the rows through 3 Gets and 0 Scans. We can do gets because all the operations +are `equal` operations. + +[source,sql] +---- +SELECT + KEY_FIELD, + B_FIELD, + A_FIELD +FROM hbaseTmp +WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3') +---- + +Now lets look at an example where we will end up doing two scans on HBase. + +[source, sql] +---- +SELECT + KEY_FIELD, + B_FIELD, + A_FIELD +FROM hbaseTmp +WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3' +---- + +In this example we will get 0 Gets and 2 Scans. One scan will load everything +from the first row in the table until “get2” and the second scan will get +everything from “get3” until the last row in the table. + +The next query is a good example of having a good deal of range checks. However +the ranges overlap. To the code will be smart enough to get the following data +in a single scan that encompasses all the data asked by the query. + +[source, sql] +---- +SELECT + KEY_FIELD, + B_FIELD, + A_FIELD +FROM hbaseTmp +WHERE + (KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or + (KEY_FIELD > 'get3' and KEY_FIELD <= 'get5') +---- + +The second example of push down functionality offered by the HBase-Spark module +is the ability to push down filter logic for column and cell fields. Just like +the RowKey logic, all query logic will be consolidated into the minimum number +of range checks and equal checks by sending a Filter object along with the Scan +with information about consolidated push down predicates + +.SparkSQL Code Example +==== +This example shows how we can interact with HBase with SQL. + +[source, scala] +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +new HBaseContext(sc, TEST_UTIL.getConfiguration) +val sqlContext = new SQLContext(sc) + +df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b", + "hbase.table" -> "t1")) + +df.registerTempTable("hbaseTmp") + +val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " + + "WHERE " + + "(KEY_FIELD = 'get1' and B_FIELD < '3') or " + + "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5) +---- + +There are three major parts of this example that deserve explaining. + +The sqlContext.load function:: + In the sqlContext.load function we see two + parameters. The first of these parameters is pointing Spark to the HBase + DefaultSource class that will act as the interface between SparkSQL and HBase. + +A map of key value pairs:: + In this example we have two keys in our map, `hbase.columns.mapping` and + `hbase.table`. The `hbase.table` directs SparkSQL to use the given HBase table. + The `hbase.columns.mapping` key give us the logic to translate HBase columns to + SparkSQL columns. ++ +The `hbase.columns.mapping` is a string that follows the following format ++ +[source, scala] +---- +(SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier) +---- ++ +In the example below we see the definition of three fields. Because KEY_FIELD has +no ColumnFamily, it is the RowKey. ++ +---- +KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b +---- + +The registerTempTable function:: + This is a SparkSQL function that allows us now to be free of Scala when accessing + our HBase table directly with SQL with the table name of "hbaseTmp". + +The last major point to note in the example is the `sqlContext.sql` function, which +allows the user to ask their questions in SQL which will be pushed down to the +DefaultSource code in the HBase-Spark module. The result of this command will be +a DataFrame with the Schema of KEY_FIELD and B_FIELD. +==== \ No newline at end of file -- 2.3.8 (Apple Git-58)