diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala new file mode 100644 index 0000000..2880c5d --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.datasources + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.hadoop.hbase.spark.AvroSerdes +import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} + +/** + * @param col0 Column #0, Type is String + * @param col1 Column #1, Type is Array[Byte] + */ +case class AvroHBaseRecord(col0: String, + col1: Array[Byte]) + +object AvroHBaseRecord { + val schemaString = + s"""{"namespace": "example.avro", + | "type": "record", "name": "User", + | "fields": [ + | {"name": "name", "type": "string"}, + | {"name": "favorite_number", "type": ["int", "null"]}, + | {"name": "favorite_color", "type": ["string", "null"]}, + | {"name": "favorite_array", "type": {"type": "array", "items": "string"}}, + | {"name": "favorite_map", "type": {"type": "map", "values": "int"}} + | ] }""".stripMargin + + val avroSchema: Schema = { + val p = new Schema.Parser + p.parse(schemaString) + } + + def apply(i: Int): AvroHBaseRecord = { + + val user = new GenericData.Record(avroSchema); + user.put("name", s"name${"%03d".format(i)}") + user.put("favorite_number", i) + user.put("favorite_color", s"color${"%03d".format(i)}") + val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema()) + favoriteArray.add(s"number${i}") + favoriteArray.add(s"number${i+1}") + user.put("favorite_array", favoriteArray) + import collection.JavaConverters._ + val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava + user.put("favorite_map", favoriteMap) + val avroByte = AvroSerdes.serialize(user, avroSchema) + AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte) + } +} + +object AvroSource { + def catalog = s"""{ + |"table":{"namespace":"default", "name":"ExampleAvrotable"}, + |"rowkey":"key", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"binary"} + |} + |}""".stripMargin + + def avroCatalog = s"""{ + |"table":{"namespace":"default", "name":"ExampleAvrotable"}, + |"rowkey":"key", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, + |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"} + |} + |}""".stripMargin + + def avroCatalogInsert = s"""{ + |"table":{"namespace":"default", "name":"ExampleAvrotableInsert"}, + |"rowkey":"key", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, + |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"} + |} + |}""".stripMargin + + def main(args: Array[String]) { + val sparkConf = new SparkConf().setAppName("AvroSourceExample") + val sc = new SparkContext(sparkConf) + val sqlContext = new SQLContext(sc) + + import sqlContext.implicits._ + + def withCatalog(cat: String): DataFrame = { + sqlContext + .read + .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog)) + .format("org.apache.hadoop.hbase.spark") + .load() + } + + val data = (0 to 255).map { i => + AvroHBaseRecord(i) + } + + sc.parallelize(data).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + + val df = withCatalog(catalog) + df.show + df.printSchema() + df.registerTempTable("ExampleAvrotable") + val c = sqlContext.sql("select count(1) from ExampleAvrotable") + c.show + + val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001") + filtered.show + val collected = filtered.collect() + if (collected(0).getSeq[String](1)(0) != "number1") { + throw new UserCustomizedSampleException("value invalid") + } + if (collected(0).getSeq[String](1)(1) != "number2") { + throw new UserCustomizedSampleException("value invalid") + } + + df.write.options( + Map("avroSchema"->AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog->avroCatalogInsert, + HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + val newDF = withCatalog(avroCatalogInsert) + newDF.show + newDF.printSchema() + if(newDF.count() != 256) { + throw new UserCustomizedSampleException("value invalid") + } + + df.filter($"col1.name" === "name005" || $"col1.name" <= "name005") + .select("col0", "col1.favorite_color", "col1.favorite_number") + .show + + df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007")) + .select("col0", "col1.favorite_color", "col1.favorite_number") + .show + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala new file mode 100644 index 0000000..5839bf7 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.datasources + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog + +class UserCustomizedSampleException(message: String = null, cause: Throwable = null) extends + RuntimeException(UserCustomizedSampleException.message(message, cause), cause) + +object UserCustomizedSampleException { + def message(message: String, cause: Throwable) = + if (message != null) message + else if (cause != null) cause.toString() + else null +} + +case class IntKeyRecord( + col0: Integer, + col1: Boolean, + col2: Double, + col3: Float, + col4: Int, + col5: Long, + col6: Short, + col7: String, + col8: Byte) + +object IntKeyRecord { + def apply(i: Int): IntKeyRecord = { + IntKeyRecord(if (i % 2 == 0) i else -i, + i % 2 == 0, + i.toDouble, + i.toFloat, + i, + i.toLong, + i.toShort, + s"String$i extra", + i.toByte) + } +} + +object DataType { + val cat = s"""{ + |"table":{"namespace":"default", "name":"DataTypeExampleTable"}, + |"rowkey":"key", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"int"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, + |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, + |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, + |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, + |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, + |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, + |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, + |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} + |} + |}""".stripMargin + + def main(args: Array[String]){ + val sparkConf = new SparkConf().setAppName("DataTypeExample") + val sc = new SparkContext(sparkConf) + val sqlContext = new SQLContext(sc) + + import sqlContext.implicits._ + + def withCatalog(cat: String): DataFrame = { + sqlContext + .read + .options(Map(HBaseTableCatalog.tableCatalog->cat)) + .format("org.apache.hadoop.hbase.spark") + .load() + } + + // test populate table + val data = (0 until 32).map { i => + IntKeyRecord(i) + } + sc.parallelize(data).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + + // test less than 0 + val df = withCatalog(cat) + val s = df.filter($"col0" < 0) + s.show + if(s.count() != 16){ + throw new UserCustomizedSampleException("value invalid") + } + + //test less or equal than -10. The number of results is 11 + val num1 = df.filter($"col0" <= -10) + num1.show + val c1 = num1.count() + println(s"test result count should be 11: $c1") + + //test less or equal than -9. The number of results is 12 + val num2 = df.filter($"col0" <= -9) + num2.show + val c2 = num2.count() + println(s"test result count should be 12: $c2") + + //test greater or equal than -9". The number of results is 21 + val num3 = df.filter($"col0" >= -9) + num3.show + val c3 = num3.count() + println(s"test result count should be 21: $c3") + + //test greater or equal than 0. The number of results is 16 + val num4 = df.filter($"col0" >= 0) + num4.show + val c4 = num4.count() + println(s"test result count should be 16: $c4") + + //test greater than 10. The number of results is 10 + val num5 = df.filter($"col0" > 10) + num5.show + val c5 = num5.count() + println(s"test result count should be 10: $c5") + + // test "and". The number of results is 11 + val num6 = df.filter($"col0" > -10 && $"col0" <= 10) + num6.show + val c6 = num6.count() + println(s"test result count should be 11: $c6") + + //test "or". The number of results is 21 + val num7 = df.filter($"col0" <= -10 || $"col0" > 10) + num7.show + val c7 = num7.count() + println(s"test result count should be 21: $c7") + + //test "all". The number of results is 32 + val num8 = df.filter($"col0" >= -100) + num8.show + val c8 = num8.count() + println(s"test result count should be 32: $c8") + + //test "full query" + val df1 = withCatalog(cat) + df1.show() + val c_df = df1.count() + println(s"df count should be 32: $c_df") + if(c_df != 32){ + throw new UserCustomizedSampleException("value invalid") + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala new file mode 100644 index 0000000..ed23990 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.datasources + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog + +case class HBaseRecord( + col0: String, + col1: Boolean, + col2: Double, + col3: Float, + col4: Int, + col5: Long, + col6: Short, + col7: String, + col8: Byte) + +object HBaseRecord { + def apply(i: Int): HBaseRecord = { + val s = s"""row${"%03d".format(i)}""" + HBaseRecord(s, + i % 2 == 0, + i.toDouble, + i.toFloat, + i, + i.toLong, + i.toShort, + s"String$i extra", + i.toByte) + } +} + +object HBaseSource { + val cat = s"""{ + |"table":{"namespace":"default", "name":"HBaseSourceExampleTable"}, + |"rowkey":"key", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, + |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, + |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, + |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, + |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, + |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, + |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, + |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} + |} + |}""".stripMargin + + def main(args: Array[String]) { + val sparkConf = new SparkConf().setAppName("HBaseSourceExample") + val sc = new SparkContext(sparkConf) + val sqlContext = new SQLContext(sc) + + import sqlContext.implicits._ + + def withCatalog(cat: String): DataFrame = { + sqlContext + .read + .options(Map(HBaseTableCatalog.tableCatalog->cat)) + .format("org.apache.hadoop.hbase.spark") + .load() + } + + val data = (0 to 255).map { i => + HBaseRecord(i) + } + + sc.parallelize(data).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + + val df = withCatalog(cat) + df.show + df.filter($"col0" <= "row005") + .select($"col0", $"col1").show + df.filter($"col0" === "row005" || $"col0" <= "row005") + .select($"col0", $"col1").show + df.filter($"col0" > "row250") + .select($"col0", $"col1").show + df.registerTempTable("table1") + val c = sqlContext.sql("select count(col1) from table1 where col0 < 'row050'") + c.show() + } +}