diff --git a/src/main/asciidoc/_chapters/spark.adoc b/src/main/asciidoc/_chapters/spark.adoc new file mode 100644 index 0000000..23e2257 --- /dev/null +++ b/src/main/asciidoc/_chapters/spark.adoc @@ -0,0 +1,443 @@ +//// +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +//// + +[[spark]] += HBase and Spark +:doctype: book +:numbered: +:toc: left +:icons: font +:experimental: + +Apache Spark is a software framework that is used to process data in memory in a +distributed manner. + +http://spark.apache.org/[Apache Spark] + +Spark itself is out of scope of this document, please refer to the Spark site for more +information on the Spark project and subprojects. Here are some good Spark +component references to read before reading this chapter. + +* http://spark.apache.org/docs/latest/[Spark Core Overview] +* http://spark.apache.org/docs/latest/streaming-programming-guide.html[Spark Streaming Overview] +* http://spark.apache.org/docs/latest/sql-programming-guide.html[Spark SQL and DataFrame Overview] + +In addition Spark can be written in Scala, Java, Python and now R. The HBase-Spark module +currently supports Scala and Java. This chapter will focus on Scala examples because it is +the primary language that Spark is written in. + +== Chapter Overview +This chapter will focus on four main +interaction points between Spark and HBase. Those interaction points are: + +Basic Spark:: The ability to have a HBase Connection at any point in your Spark DAG. +Spark Streaming:: The ability to have a HBase Connection at any point in your Spark +Streaming application. +Spark Bulk Load:: The ability to write directly to HBase HFiles for bulk insertion into HBase +SparkSQL/DataFrames:: The ability to write SparkSQL that draws on tables that are +represented in HBase. + +The following sections will walk through examples of all the interaction points' +just listed above. + +== Basic Spark +Here we will talk about Spark HBase integration at the lowest and simplest levels. +All the other interaction points are built upon the concepts that will be described +here. + +At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext +takes in HBase configurations and pushes them to the Spark executors. This allows us +to have an HBase Connection per Spark Executor in a static location. + +Just for reference Spark Executors can be on the same nodes as the Region Servers or +on different nodes there is no dependence of co-location. Think of every Spark +Executor as a multi-threaded client application. + +NOTE: Having Executors on the same nodes as the Region Servers will reduce network traffic in +some use cases but not in all. Example when reading from HBase with distributed scans the +code will make a best effort to co-located the reader, but on a bulk put operation we are +putting from the existing Spark Partition node and may be putting to all Region Servers. + +This allows any Spark Tasks running on the executors to access the shared Connection +object. + +Here is a simple example of how the HBaseContext can be used. In this example we are +doing a foreachPartition on a RDD in Scala. + +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +... + +val hbaseContext = new HBaseContext(sc, config) + +rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { + val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) + it.foreach((putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => + put.addColumn(putValue._1, putValue._2, putValue._3)) + bufferedMutator.mutate(put) + }) + bufferedMutator.flush() + bufferedMutator.close() +}) +---- + +If Java is perferred instead of Scala it will look a little different but still vary +possible as we can see with this example. + +---- +JavaSparkContext jsc = new JavaSparkContext(sparkConf); + +try { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + ... + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.foreachPartition(rdd, + new VoidFunction, Connection>>() { + public void call(Tuple2, Connection> t) + throws Exception { + Table table = t._2().getTable(TableName.valueOf(tableName)); + try { + BufferedMutator mutator = + t._2().getBufferedMutator(TableName.valueOf(tableName)); + try { + while (t._1().hasNext()) { + byte[] b = t._1().next(); + Result r = table.get(new Get(b)); + if (r.getExists()) { + mutator.mutate(new Put(b)); + } + } + } finally { + mutator.flush(); + mutator.close(); + } + } finally { + table.close(); + } + } + }); +} finally { + jsc.stop(); +} +---- + +All functionality between Spark and HBase is supported both in Scala and in Java, +with the exception of SparkSQL which will support any language that is supported by +Spark. For the remaining of this documentation we will focus on Scala examples for now. + +Now the examples above illustrate how to do a foreachPartition with a connection. +There are a number of other Spark base functions that are supported out of the box: + +BulkPut:: For massively parallel sending of puts to HBase +BulkDelete:: For massively parallel sending of deletes to HBase +BulkGet:: For massively parallel sending of gets to HBase to create a new RDD +MapPartition:: To do a Spark Map function with a Connection object to allow full +access to HBase +HBaseRDD:: To simplify a distributed scan to create a RDD + +Examples of all these functionalities can be found in the HBase-Spark Module. + +== Spark Streaming +http://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream processing +framework built on top of Spark. HBase and Spark Streaming make great companions in that +HBase can help serve the following benefits alongside Spark Streaming. + +* A place to grab reference data or profile data on the fly +* A place to store counts or aggregates in a way that supports Spark Streaming's +promise of only once processing. + +The HBase-Spark module’s integration points with Spark Streaming are very similar to its +normal Spark integration points. Specifically, you can interact with the following +HBase-centric functions directly from a Spark Streaming DStream + +BulkPut:: For parallel sending of puts to HBase +BulkDelete:: For parallel sending of deletes to HBase +BulkGet:: For parallel sending of gets to HBase to create a new RDD +ForeachPartition:: To do a Spark Foreach function with a Connection object to allow full +access to HBase +MapPartitions:: To do a Spark Map function with a Connection object to allow full access +to HBase + +Below is an example of bulkPut with DStreams, as you will see it is very close in feel +as the RDD bulk put. + +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) +val ssc = new StreamingContext(sc, Milliseconds(200)) + +val rdd1 = ... +val rdd2 = ... + +val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte], + Array[Byte], Array[Byte])])]]() + +queue += rdd1 +queue += rdd2 + +val dStream = ssc.queueStream(queue) + +dStream.hbaseBulkPut( + hbaseContext, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => + put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }) +---- + +There are three inputs to the hbaseBulkPut function. + +. The HBaseContext carries the configuration needed to link us to an HBase Connection; it is broadcast to each executor +Connections in the executors +. The TableName of the table we are putting data into +. A function that will convert a record in the DStream into a HBase Put object. + +== Bulk Load +Spark bulk load follows very closely to the MapReduce implementation of bulk load. +In short there is a partitioner that partitions based on region splits and the row keys are +sent to the reducers in order so that HFiles can be written out. In Spark terms the bulk +load will be focused around a repartitionAndSortWithinPartitions followed by a +foreachPartition. + +The only major difference with the Spark implementation compared to the MapReduce implementation is that the column qualifier is included in the shuffle ordering process. +This was done because the MapReduce bulk load implementation would have memory +issues with loading rows with a large numbers of columns https://issues.apache.org/jira/browse/HBASE-7743[HBASE-7743]. +This memory issue was a result of the sorting of those columns being done in the +memory of the reducer JVM. + +Now that ordering is done in the Spark Shuffle there should no longer be a limit to the +number of columns in a row for bulk loading. + +Below is simple code of what bulk loading with Spark would look like + +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) + +val stagingFolder = ... + +rdd.hbaseBulkLoad(TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath) + +val load = new LoadIncrementalHFiles(config) +load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) +---- + +You will see in this example the hbaseBulkLoad function will take three parameters. + +. The TableName of the table we intend to bulk load too +. A function that will convert a record in the RDD into a 2-tuple, with the first element being the KeyFamilyQualifier object that gives the location in HBase and the second element containing the value we wish to store there. +The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier. +The shuffle will partition on the RowKey but will sort by all three values. +. The temporary path for the HFile to be written out too + +Then following the Spark bulk load command we need to use the HBase's +LoadIncrementalHFiles object to load the newly created HFiles into HBase. + +Now there are advance options for bulk load with Spark. We can set the following attributes +with addition parameter options on hbaseBulkLoad. + +. Max file size of the HFiles +. A flag to exclude HFiles from compactions +. Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding + +Below is a code example of bulk load with these extra parameters + +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) + +val stagingFolder = ... + +val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] +val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX") + +familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options) + +rdd.hbaseBulkLoad(TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath, + familyHBaseWriterOptions, + compactionExclude = false, + HConstants.DEFAULT_MAX_FILE_SIZE) + +val load = new LoadIncrementalHFiles(config) +load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) +---- + +== SparkSQL/DataFrames + +http://spark.apache.org/sql/[SparkSQL] is a sub project of Spark that support SQL +that will compute down to a Spark DAG. In addition SparkSQL is a heavy user of +DataFrames. Think of DataFrames as RDDs with schema information. + +HBase's Spark support includes bindings for SparkSQL and DataFrames. For +end-users this allows writing SparkSQL directly against the data stored in HBase +tables. Because HBase provides first-class support, query filtering logic can be +pushed down into HBase's existing row and column filtering capabilities. + +=== Predicate Push Down +There are two examples of predicate push down in the HBase-Spark implementation. +First the example is the push down of filtering logic on the RowKey. HBase-Spark will +reduce the filters on RowKeys down to a set of Get and/or Scan commands. Note the +Scans are distributed scans not a single client scan operation. + +If the query looks something like the following, the logic will push down and get the rows +through 3 Gets and 0 Scans. We can do gets because all the operations are equal operations. +---- +SELECT + KEY_FIELD, + B_FIELD, + A_FIELD +FROM hbaseTmp +WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3') +---- + +Now lets look at an example where we will end up doing two scans on HBase. +---- +SELECT + KEY_FIELD, + B_FIELD, + A_FIELD +FROM hbaseTmp +WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3' +---- + +In this example we will get 0 Gets and 2 Scans. One scan will load everything from the +first row in the table until “get2” and the second scan will get everything from “get3” until +the last row in the table. + +Now the next query is a good example of having a good deal of range checks. However +the ranges overlap. To the code will be smart enough to get the following data in a single +scan that encompasses all the data asked by the query. + +---- +SELECT + KEY_FIELD, + B_FIELD, + A_FIELD +FROM hbaseTmp +WHERE + (KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or + (KEY_FIELD > 'get3' and KEY_FIELD <= 'get5') +---- + +The second example of push down functionality offered by the HBase-Spark module is +the ability to push down filter logic for column and cell fields. Just like the RowKey logic, +all query logic will be consolidated into the minimum number of range checks and equal +checks. This is implemented by sending a Filter object along with the Scan with +information about consolidated push down predicates + +=== SparkSQL Code Example +The following is an example in Scala of how we can interact with HBase with SQL. + +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +new HBaseContext(sc, TEST_UTIL.getConfiguration) +val sqlContext = new SQLContext(sc) + +df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b", + "hbase.table" -> "t1")) + +df.registerTempTable("hbaseTmp") + +val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " + + "WHERE " + + "(KEY_FIELD = 'get1' and B_FIELD < '3') or " + + "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5) +---- + +There are three major parts of this example that deserve explaining. First is the +sqlContext.load function. In the sqlContext.load function we see two parameters. +The first of these parameters is pointing Spark to the HBase DefaultSource class that +will act as the interface between SparkSQL and HBase. + +The second parameter is a map of key value pairs. In this example we have two keys in +our map, they are hbase.columns.mapping and hbase.table. The hbase.table key will point +SparkSQL to use the given HBase table. The hbase.columns.mapping key give us the +logic to translate HBase columns to SparkSQL columns. + +The hbase.columns.mapping is a string that follows the following format + +---- +(SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier) +---- + +In the example below we see the definition of three fields. Note that KEY_FIELD has no +ColumnFamily and therefore it is the RowKey. + +---- +KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b +---- + +The next major thing to note in the example is the registerTempTable function. +This is a SparkSQL function that allows us now to be free of Scala when accessing our +HBase table directly with SQL with the table name of "hbaseTmp". + +The last major point to note in the example is the sqlContext.sql function. This allows the +user to ask their questions in SQL which will be pushed down to the DefaultSource code +in the HBase-Spark module. The result of this command will be a DataFrame with the +Schema of KEY_FIELD and B_FIELD. diff --git a/src/main/asciidoc/book.adoc b/src/main/asciidoc/book.adoc index d030c38..d82baf1 100644 --- a/src/main/asciidoc/book.adoc +++ b/src/main/asciidoc/book.adoc @@ -59,6 +59,7 @@ include::_chapters/shell.adoc[] include::_chapters/datamodel.adoc[] include::_chapters/schema_design.adoc[] include::_chapters/mapreduce.adoc[] +include::_chapters/spark.adoc[] include::_chapters/security.adoc[] include::_chapters/architecture.adoc[] include::_chapters/hbase_mob.adoc[]