diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadMultiVersionCellSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadMultiVersionCellSuite.scala new file mode 100644 index 0000000..4e25c46 --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadMultiVersionCellSuite.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{CellUtil, HBaseTestingUtility, TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.client.{Get, Put, HTable, ConnectionFactory} +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +import scala.collection.mutable + + +class BulkLoadMultiVersionCellSuite extends FunSuite with +BeforeAndAfterEach with BeforeAndAfterAll with Logging { + + @transient var sc: SparkContext = null + + var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility + + val tableName = "t1" + val columnFamily = "c" + val numVersions = 3 + val columnName = "log" + + override def beforeAll() { + + TEST_UTIL.startMiniCluster + + logInfo(" - minicluster started") + try + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + catch { + case e: Exception => logInfo(" - no table " + tableName + " found") + } + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily), numVersions) + logInfo(" - created table") + + sc = new SparkContext("local", "test") + } + + override def afterAll() { + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + sc.stop() + } + + def generateData(): Seq[(String,String,Long)] = { + val arr = mutable.ArrayBuffer[(String,String,Long)]() + for(i<- 1 to 2){ + arr += ( ("id", s"log${i}", System.currentTimeMillis()) ) + Thread.sleep(1000) + } + arr.toSeq + } + + test("bulkLoad multi-version cell to test HBase client") { + + val stagingFolder = "/tmp/bulkload/test" + + val data = generateData() + val rdd = sc.parallelize( data ) + + val hbaseConf = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(hbaseConf) + val hbaseContext = new HBaseContext(sc, hbaseConf) + + import org.apache.hadoop.hbase.spark.HBaseRDDFunctions.GenericHBaseRDDFunctions + + rdd.hbaseBulkLoad( + hbaseContext, + TableName.valueOf(tableName), + t => { + val id = t._1 + val log = t._2 + val timestamp = t._3 + + val rowKey = Bytes.toBytes(id) + val cf = Bytes.toBytes(columnFamily) + val column = Bytes.toBytes(columnName) + val value = Bytes.toBytes(log) + + val keyFamilyQualifier = new KeyFamilyQualifier(rowKey, cf, column, timestamp) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder + ) + + val bulkLoader = new LoadIncrementalHFiles(hbaseConf) + bulkLoader.doBulkLoad( + new Path(stagingFolder), + connection.getAdmin, + new HTable(hbaseConf, tableName), + connection.getRegionLocator(TableName.valueOf(tableName)) + ) + + + val table = connection.getTable(TableName.valueOf(tableName)) + try { + val list = table.get(new Get(Bytes.toBytes("id"))) + .getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName)) + + val cellV2 = Bytes.toString(CellUtil.cloneValue(list.get(0))) + val cellV1 = Bytes.toString(CellUtil.cloneValue(list.get(1))) + + assert(cellV2 == data(1)._2) + assert(cellV1 == data(0)._2) + + } finally { + connection.close() + } + + } + + +}