From 9886cc1ba2ca5a7122e2b33fef45fdc66eac8de7 Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Mon, 7 Sep 2015 09:51:47 -0500 Subject: [PATCH] HBASE-14377 JavaHBaseContextSuite isn't being run. --- hbase-spark/pom.xml | 10 +- .../hadoop/hbase/spark/JavaHBaseContextSuite.java | 334 -------------------- .../hadoop/hbase/spark/TestJavaHBaseContext.java | 338 +++++++++++++++++++++ 3 files changed, 347 insertions(+), 335 deletions(-) delete mode 100644 hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java create mode 100644 hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml index 8110629..0af80ba 100644 --- a/hbase-spark/pom.xml +++ b/hbase-spark/pom.xml @@ -40,6 +40,7 @@ 1.3.0 2.10.4 2.10 + true ${project.basedir}/.. @@ -331,6 +332,14 @@ org.apache.hbase + hbase-annotations + ${project.version} + test-jar + test + + + + org.apache.hbase hbase-hadoop-compat ${project.version} test @@ -507,7 +516,6 @@ - src/test/scala org.apache.maven.plugins diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java deleted file mode 100644 index f19ad10..0000000 --- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.spark; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; - -import java.util.*; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.junit.*; - -import scala.Tuple2; - -import com.google.common.io.Files; - -public class JavaHBaseContextSuite implements Serializable { - private transient JavaSparkContext jsc; - HBaseTestingUtility htu; - protected static final Log LOG = LogFactory.getLog(JavaHBaseContextSuite.class); - - - byte[] tableName = Bytes.toBytes("t1"); - byte[] columnFamily = Bytes.toBytes("c"); - String columnFamilyStr = Bytes.toString(columnFamily); - - @Before - public void setUp() { - jsc = new JavaSparkContext("local", "JavaHBaseContextSuite"); - jsc.addJar("spark.jar"); - - File tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - - htu = HBaseTestingUtility.createLocalHTU(); - try { - LOG.info("cleaning up test dir"); - - htu.cleanupTestDir(); - - LOG.info("starting minicluster"); - - htu.startMiniZKCluster(); - htu.startMiniHBaseCluster(1, 1); - - LOG.info(" - minicluster started"); - - try { - htu.deleteTable(TableName.valueOf(tableName)); - } catch (Exception e) { - LOG.info(" - no table " + Bytes.toString(tableName) + " found"); - } - - LOG.info(" - creating table " + Bytes.toString(tableName)); - htu.createTable(TableName.valueOf(tableName), - columnFamily); - LOG.info(" - created table"); - } catch (Exception e1) { - throw new RuntimeException(e1); - } - } - - @After - public void tearDown() { - try { - htu.deleteTable(TableName.valueOf(tableName)); - LOG.info("shuting down minicluster"); - htu.shutdownMiniHBaseCluster(); - htu.shutdownMiniZKCluster(); - LOG.info(" - minicluster shut down"); - htu.cleanupTestDir(); - } catch (Exception e) { - throw new RuntimeException(e); - } - jsc.stop(); - jsc = null; - } - - @Test - public void testBulkPut() throws IOException { - - List list = new ArrayList<>(); - list.add("1," + columnFamilyStr + ",a,1"); - list.add("2," + columnFamilyStr + ",a,2"); - list.add("3," + columnFamilyStr + ",a,3"); - list.add("4," + columnFamilyStr + ",a,4"); - list.add("5," + columnFamilyStr + ",a,5"); - - JavaRDD rdd = jsc.parallelize(list); - - Configuration conf = htu.getConfiguration(); - - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(TableName.valueOf(tableName)); - - try { - List deletes = new ArrayList<>(); - for (int i = 1; i < 6; i++) { - deletes.add(new Delete(Bytes.toBytes(Integer.toString(i)))); - } - table.delete(deletes); - } finally { - table.close(); - } - - hbaseContext.bulkPut(rdd, - TableName.valueOf(tableName), - new PutFunction()); - - table = conn.getTable(TableName.valueOf(tableName)); - - try { - Result result1 = table.get(new Get(Bytes.toBytes("1"))); - Assert.assertNotNull("Row 1 should had been deleted", result1.getRow()); - - Result result2 = table.get(new Get(Bytes.toBytes("2"))); - Assert.assertNotNull("Row 2 should had been deleted", result2.getRow()); - - Result result3 = table.get(new Get(Bytes.toBytes("3"))); - Assert.assertNotNull("Row 3 should had been deleted", result3.getRow()); - - Result result4 = table.get(new Get(Bytes.toBytes("4"))); - Assert.assertNotNull("Row 4 should had been deleted", result4.getRow()); - - Result result5 = table.get(new Get(Bytes.toBytes("5"))); - Assert.assertNotNull("Row 5 should had been deleted", result5.getRow()); - } finally { - table.close(); - conn.close(); - } - } - - public static class PutFunction implements Function { - - private static final long serialVersionUID = 1L; - - public Put call(String v) throws Exception { - String[] cells = v.split(","); - Put put = new Put(Bytes.toBytes(cells[0])); - - put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]), - Bytes.toBytes(cells[3])); - return put; - } - } - - @Test - public void testBulkDelete() throws IOException { - List list = new ArrayList<>(); - list.add(Bytes.toBytes("1")); - list.add(Bytes.toBytes("2")); - list.add(Bytes.toBytes("3")); - - JavaRDD rdd = jsc.parallelize(list); - - Configuration conf = htu.getConfiguration(); - - populateTableWithMockData(conf, TableName.valueOf(tableName)); - - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName), - new JavaHBaseBulkDeleteExample.DeleteFunction(), 2); - - - - try ( - Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(TableName.valueOf(tableName)) - ){ - Result result1 = table.get(new Get(Bytes.toBytes("1"))); - Assert.assertNull("Row 1 should had been deleted", result1.getRow()); - - Result result2 = table.get(new Get(Bytes.toBytes("2"))); - Assert.assertNull("Row 2 should had been deleted", result2.getRow()); - - Result result3 = table.get(new Get(Bytes.toBytes("3"))); - Assert.assertNull("Row 3 should had been deleted", result3.getRow()); - - Result result4 = table.get(new Get(Bytes.toBytes("4"))); - Assert.assertNotNull("Row 4 should had been deleted", result4.getRow()); - - Result result5 = table.get(new Get(Bytes.toBytes("5"))); - Assert.assertNotNull("Row 5 should had been deleted", result5.getRow()); - } - } - - @Test - public void testDistributedScan() throws IOException { - Configuration conf = htu.getConfiguration(); - - populateTableWithMockData(conf, TableName.valueOf(tableName)); - - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - Scan scan = new Scan(); - scan.setCaching(100); - - JavaRDD javaRdd = - hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) - .map(new ScanConvertFunction()); - - List results = javaRdd.collect(); - - Assert.assertEquals(results.size(), 5); - } - - private static class ScanConvertFunction implements - Function, String> { - @Override - public String call(Tuple2 v1) throws Exception { - return Bytes.toString(v1._1().copyBytes()); - } - } - - @Test - public void testBulkGet() throws IOException { - List list = new ArrayList<>(); - list.add(Bytes.toBytes("1")); - list.add(Bytes.toBytes("2")); - list.add(Bytes.toBytes("3")); - list.add(Bytes.toBytes("4")); - list.add(Bytes.toBytes("5")); - - JavaRDD rdd = jsc.parallelize(list); - - Configuration conf = htu.getConfiguration(); - - populateTableWithMockData(conf, TableName.valueOf(tableName)); - - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - final JavaRDD stringJavaRDD = - hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, - new GetFunction(), - new ResultFunction()); - - Assert.assertEquals(stringJavaRDD.count(), 5); - } - - public static class GetFunction implements Function { - - private static final long serialVersionUID = 1L; - - public Get call(byte[] v) throws Exception { - return new Get(v); - } - } - - public static class ResultFunction implements Function { - - private static final long serialVersionUID = 1L; - - public String call(Result result) throws Exception { - Iterator it = result.listCells().iterator(); - StringBuilder b = new StringBuilder(); - - b.append(Bytes.toString(result.getRow())).append(":"); - - while (it.hasNext()) { - Cell cell = it.next(); - String q = Bytes.toString(CellUtil.cloneQualifier(cell)); - if ("counter".equals(q)) { - b.append("(") - .append(q) - .append(",") - .append(Bytes.toLong(CellUtil.cloneValue(cell))) - .append(")"); - } else { - b.append("(") - .append(q) - .append(",") - .append(Bytes.toString(CellUtil.cloneValue(cell))) - .append(")"); - } - } - return b.toString(); - } - } - - private void populateTableWithMockData(Configuration conf, TableName tableName) - throws IOException { - try ( - Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName)) { - - List puts = new ArrayList<>(); - - for (int i = 1; i < 6; i++) { - Put put = new Put(Bytes.toBytes(Integer.toString(i))); - put.addColumn(columnFamily, columnFamily, columnFamily); - puts.add(put); - } - table.put(puts); - } - } - -} \ No newline at end of file diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java new file mode 100644 index 0000000..724ac36 --- /dev/null +++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.junit.*; +import org.junit.experimental.categories.Category; + +import scala.Tuple2; + +import com.google.common.io.Files; + +@Category({MiscTests.class, MediumTests.class}) +public class TestJavaHBaseContext implements Serializable { + private transient JavaSparkContext jsc; + HBaseTestingUtility htu; + protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class); + + + byte[] tableName = Bytes.toBytes("t1"); + byte[] columnFamily = Bytes.toBytes("c"); + String columnFamilyStr = Bytes.toString(columnFamily); + + @Before + public void setUp() { + jsc = new JavaSparkContext("local", "JavaHBaseContextSuite"); + jsc.addJar("spark.jar"); + + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + + htu = HBaseTestingUtility.createLocalHTU(); + try { + LOG.info("cleaning up test dir"); + + htu.cleanupTestDir(); + + LOG.info("starting minicluster"); + + htu.startMiniZKCluster(); + htu.startMiniHBaseCluster(1, 1); + + LOG.info(" - minicluster started"); + + try { + htu.deleteTable(TableName.valueOf(tableName)); + } catch (Exception e) { + LOG.info(" - no table " + Bytes.toString(tableName) + " found"); + } + + LOG.info(" - creating table " + Bytes.toString(tableName)); + htu.createTable(TableName.valueOf(tableName), + columnFamily); + LOG.info(" - created table"); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } + + @After + public void tearDown() { + try { + htu.deleteTable(TableName.valueOf(tableName)); + LOG.info("shuting down minicluster"); + htu.shutdownMiniHBaseCluster(); + htu.shutdownMiniZKCluster(); + LOG.info(" - minicluster shut down"); + htu.cleanupTestDir(); + } catch (Exception e) { + throw new RuntimeException(e); + } + jsc.stop(); + jsc = null; + } + + @Test + public void testBulkPut() throws IOException { + + List list = new ArrayList<>(); + list.add("1," + columnFamilyStr + ",a,1"); + list.add("2," + columnFamilyStr + ",a,2"); + list.add("3," + columnFamilyStr + ",a,3"); + list.add("4," + columnFamilyStr + ",a,4"); + list.add("5," + columnFamilyStr + ",a,5"); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = htu.getConfiguration(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(TableName.valueOf(tableName)); + + try { + List deletes = new ArrayList<>(); + for (int i = 1; i < 6; i++) { + deletes.add(new Delete(Bytes.toBytes(Integer.toString(i)))); + } + table.delete(deletes); + } finally { + table.close(); + } + + hbaseContext.bulkPut(rdd, + TableName.valueOf(tableName), + new PutFunction()); + + table = conn.getTable(TableName.valueOf(tableName)); + + try { + Result result1 = table.get(new Get(Bytes.toBytes("1"))); + Assert.assertNotNull("Row 1 should had been deleted", result1.getRow()); + + Result result2 = table.get(new Get(Bytes.toBytes("2"))); + Assert.assertNotNull("Row 2 should had been deleted", result2.getRow()); + + Result result3 = table.get(new Get(Bytes.toBytes("3"))); + Assert.assertNotNull("Row 3 should had been deleted", result3.getRow()); + + Result result4 = table.get(new Get(Bytes.toBytes("4"))); + Assert.assertNotNull("Row 4 should had been deleted", result4.getRow()); + + Result result5 = table.get(new Get(Bytes.toBytes("5"))); + Assert.assertNotNull("Row 5 should had been deleted", result5.getRow()); + } finally { + table.close(); + conn.close(); + } + } + + public static class PutFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Put call(String v) throws Exception { + String[] cells = v.split(","); + Put put = new Put(Bytes.toBytes(cells[0])); + + put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]), + Bytes.toBytes(cells[3])); + return put; + } + } + + @Test + public void testBulkDelete() throws IOException { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = htu.getConfiguration(); + + populateTableWithMockData(conf, TableName.valueOf(tableName)); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName), + new JavaHBaseBulkDeleteExample.DeleteFunction(), 2); + + + + try ( + Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(TableName.valueOf(tableName)) + ){ + Result result1 = table.get(new Get(Bytes.toBytes("1"))); + Assert.assertNull("Row 1 should had been deleted", result1.getRow()); + + Result result2 = table.get(new Get(Bytes.toBytes("2"))); + Assert.assertNull("Row 2 should had been deleted", result2.getRow()); + + Result result3 = table.get(new Get(Bytes.toBytes("3"))); + Assert.assertNull("Row 3 should had been deleted", result3.getRow()); + + Result result4 = table.get(new Get(Bytes.toBytes("4"))); + Assert.assertNotNull("Row 4 should had been deleted", result4.getRow()); + + Result result5 = table.get(new Get(Bytes.toBytes("5"))); + Assert.assertNotNull("Row 5 should had been deleted", result5.getRow()); + } + } + + @Test + public void testDistributedScan() throws IOException { + Configuration conf = htu.getConfiguration(); + + populateTableWithMockData(conf, TableName.valueOf(tableName)); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + Scan scan = new Scan(); + scan.setCaching(100); + + JavaRDD javaRdd = + hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) + .map(new ScanConvertFunction()); + + List results = javaRdd.collect(); + + Assert.assertEquals(results.size(), 5); + } + + private static class ScanConvertFunction implements + Function, String> { + @Override + public String call(Tuple2 v1) throws Exception { + return Bytes.toString(v1._1().copyBytes()); + } + } + + @Test + public void testBulkGet() throws IOException { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = htu.getConfiguration(); + + populateTableWithMockData(conf, TableName.valueOf(tableName)); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + final JavaRDD stringJavaRDD = + hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, + new GetFunction(), + new ResultFunction()); + + Assert.assertEquals(stringJavaRDD.count(), 5); + } + + public static class GetFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Get call(byte[] v) throws Exception { + return new Get(v); + } + } + + public static class ResultFunction implements Function { + + private static final long serialVersionUID = 1L; + + public String call(Result result) throws Exception { + Iterator it = result.listCells().iterator(); + StringBuilder b = new StringBuilder(); + + b.append(Bytes.toString(result.getRow())).append(":"); + + while (it.hasNext()) { + Cell cell = it.next(); + String q = Bytes.toString(CellUtil.cloneQualifier(cell)); + if ("counter".equals(q)) { + b.append("(") + .append(q) + .append(",") + .append(Bytes.toLong(CellUtil.cloneValue(cell))) + .append(")"); + } else { + b.append("(") + .append(q) + .append(",") + .append(Bytes.toString(CellUtil.cloneValue(cell))) + .append(")"); + } + } + return b.toString(); + } + } + + private void populateTableWithMockData(Configuration conf, TableName tableName) + throws IOException { + try ( + Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName)) { + + List puts = new ArrayList<>(); + + for (int i = 1; i < 6; i++) { + Put put = new Put(Bytes.toBytes(Integer.toString(i))); + put.addColumn(columnFamily, columnFamily, columnFamily); + puts.add(put); + } + table.put(puts); + } + } + +} \ No newline at end of file -- 2.1.0