diff --git a/hcatalog/build-support/ant/checkstyle.xml b/hcatalog/build-support/ant/checkstyle.xml index 93218c4..5ed9516 100644 --- a/hcatalog/build-support/ant/checkstyle.xml +++ b/hcatalog/build-support/ant/checkstyle.xml @@ -33,6 +33,7 @@ + @@ -48,6 +49,12 @@ + + + + + + diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java index 2c595a8..6dfa506 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java @@ -31,7 +31,7 @@ /** * The abstract Class HCatStorageHandler would server as the base class for all * the storage handlers required for non-native tables in HCatalog. - * @deprecated Use/modify {@link org.apache.hcatalog.mapreduce.HCatStorageHandler} instead + * @deprecated Use/modify {@link org.apache.hadoop.hive.ql.metadata.HiveStorageHandler} instead */ public abstract class HCatStorageHandler implements HiveStorageHandler { diff --git a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java index c304e48..46eb157 100644 --- a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java +++ b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java @@ -32,9 +32,9 @@ import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.util.Progressable; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.mapreduce.OutputJobInfo; public class HBaseBaseOutputFormat implements OutputFormat, Put>, HiveOutputFormat, Put> { diff --git a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java index 242b546..6c1c541 100644 --- a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java +++ b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java @@ -19,7 +19,7 @@ package org.apache.hcatalog.hbase; -import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatConstants; /** * Constants class for constants used in HBase storage handler. diff --git a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java index f4649e6..67ab2bc 100644 --- a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java +++ b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.serde2.SerDe; @@ -55,18 +54,19 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatStorageHandler; import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter; import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration; import org.apache.hcatalog.hbase.snapshot.Transaction; -import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hive.hcatalog.mapreduce.HCatTableInfo; -import org.apache.hive.hcatalog.mapreduce.InputJobInfo; -import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.apache.thrift.TBase; import org.apache.zookeeper.ZooKeeper; @@ -77,8 +77,13 @@ * This class HBaseHCatStorageHandler provides functionality to create HBase * tables through HCatalog. The implementation is very similar to the * HiveHBaseStorageHandler, with more details to suit HCatalog. + * + * Note : As of 0.12, this class is considered deprecated and a candidate for future removal + * All new code must use the Hive HBaseStorageHandler instead + * + * @deprecated Use/modify {@link org.apache.hadoop.hive.hbase.HBaseStorageHandler} instead */ -public class HBaseHCatStorageHandler extends DefaultStorageHandler implements HiveMetaHook, Configurable { +public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable { public final static String DEFAULT_PREFIX = "default."; private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation"; @@ -448,6 +453,7 @@ static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo) { return HBaseInputFormat.class; } + @Deprecated @Override public Class getOutputFormatClass() { return HBaseBaseOutputFormat.class; @@ -468,12 +474,19 @@ static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo) { return HBaseSerDe.class; } + @Deprecated public Configuration getJobConf() { return jobConf; } @Deprecated @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + // do nothing + } + + @Deprecated + @Override public Configuration getConf() { if (hbaseConf == null) { diff --git a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java index 3fe468c..7f8444c 100644 --- a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java +++ b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java @@ -34,9 +34,9 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.mapreduce.InputJobInfo; /** * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase. diff --git a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java index ba6271c..9f8af66 100644 --- a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java +++ b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java @@ -29,18 +29,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.hbase.HBaseSerDe; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory; import org.apache.hcatalog.hbase.snapshot.TableSnapshot; import org.apache.hcatalog.hbase.snapshot.Transaction; -import org.apache.hive.hcatalog.mapreduce.HCatTableInfo; -import org.apache.hive.hcatalog.mapreduce.InputJobInfo; -import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; -import org.apache.hive.hcatalog.mapreduce.StorerInfo; +import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.mapreduce.StorerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java index 7d44334..49d05cc 100644 --- a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java +++ b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java @@ -38,11 +38,11 @@ import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.hbase.snapshot.FamilyRevision; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.TableSnapshot; -import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.InputJobInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java index 15ca56b..a19abe2 100644 --- a/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java +++ b/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.HCatRecord; import java.io.IOException; diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java index 3cb8c9f..32e8dd7 100644 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java @@ -47,13 +47,13 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.DefaultHCatRecord; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter; import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction; import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction; @@ -62,9 +62,9 @@ import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration; import org.apache.hcatalog.hbase.snapshot.TableSnapshot; import org.apache.hcatalog.hbase.snapshot.Transaction; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.junit.Test; import org.slf4j.Logger; diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java index 97f4875..7ae762a 100644 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java @@ -48,21 +48,21 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.DefaultHCatRecord; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.hbase.snapshot.FamilyRevision; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration; import org.apache.hcatalog.hbase.snapshot.TableSnapshot; import org.apache.hcatalog.hbase.snapshot.Transaction; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.junit.Test; import java.io.IOException; diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseHCatStorageHandler.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseHCatStorageHandler.java index 9ed0630..2c98b80 100644 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseHCatStorageHandler.java +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseHCatStorageHandler.java @@ -36,8 +36,8 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration; import org.apache.zookeeper.KeeperException.NoNodeException; diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java deleted file mode 100644 index 085a223..0000000 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hcatalog.hbase; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.hbase.HBaseSerDe; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hive.hcatalog.mapreduce.InputJobInfo; -import org.junit.Test; - -public class TestHBaseInputFormat extends SkeletonHBaseTest { - - private static HiveConf hcatConf; - private static HCatDriver hcatDriver; - private final byte[] FAMILY = Bytes.toBytes("testFamily"); - private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); - private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2"); - - public TestHBaseInputFormat() throws Exception { - hcatConf = getHiveConf(); - hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, - HCatSemanticAnalyzer.class.getName()); - URI fsuri = getFileSystem().getUri(); - Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(), - getTestDir()); - hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString()); - hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString()); - - //Add hbase properties - - for (Map.Entry el : getHbaseConf()) { - if (el.getKey().startsWith("hbase.")) { - hcatConf.set(el.getKey(), el.getValue()); - } - } - - SessionState.start(new CliSessionState(hcatConf)); - hcatDriver = new HCatDriver(); - - } - - private List generatePuts(int num, String tableName) throws IOException { - - List columnFamilies = Arrays.asList("testFamily"); - List myPuts; - myPuts = new ArrayList(); - for (int i = 1; i <= num; i++) { - Put put = new Put(Bytes.toBytes("testRow")); - put.add(FAMILY, QUALIFIER1, i, Bytes.toBytes("textValue-" + i)); - put.add(FAMILY, QUALIFIER2, i, Bytes.toBytes("textValue-" + i)); - myPuts.add(put); - } - return myPuts; - } - - private void populateHBaseTable(String tName, int revisions) throws IOException { - List myPuts = generatePuts(revisions, tName); - HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName)); - table.put(myPuts); - } - - @Test - public void TestHBaseTableReadMR() throws Exception { - String tableName = newTableName("MyTable"); - String databaseName = newTableName("MyDatabase"); - //Table name will be lower case unless specified by hbase.table.name property - String hbaseTableName = (databaseName + "." + tableName).toLowerCase(); - String db_dir = new Path(getTestDir(), "hbasedb").toString(); - - String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" - + db_dir + "'"; - String tableQuery = "CREATE TABLE " + databaseName + "." + tableName - + "(key string, testqualifier1 string, testqualifier2 string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" ; - - CommandProcessorResponse responseOne = hcatDriver.run(dbquery); - assertEquals(0, responseOne.getResponseCode()); - CommandProcessorResponse responseTwo = hcatDriver.run(tableQuery); - assertEquals(0, responseTwo.getResponseCode()); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists(hbaseTableName); - assertTrue(doesTableExist); - - populateHBaseTable(hbaseTableName, 5); - Configuration conf = new Configuration(hcatConf); - conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, - HCatUtil.serialize(getHiveConf().getAllProperties())); - - conf.set(HBaseSerDe.HBASE_TABLE_NAME,hbaseTableName); - conf.set(TableInputFormat.INPUT_TABLE, hbaseTableName); - // output settings - Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead"); - FileSystem fs = getFileSystem(); - if (fs.exists(outputDir)) { - fs.delete(outputDir, true); - } - // create job - Job job = new Job(conf, "hbase-mr-read-test"); - job.setJarByClass(this.getClass()); - job.setMapperClass(MapReadHTable.class); - MapReadHTable.resetCounters(); - - job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, tableName, - null); - HCatInputFormat.setInput(job, inputJobInfo); - job.setOutputFormatClass(TextOutputFormat.class); - TextOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(Text.class); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(Text.class); - job.setNumReduceTasks(0); - assertTrue(job.waitForCompletion(true)); - // Note: These asserts only works in case of LocalJobRunner as they run in same jvm. - // If using MiniMRCluster, the tests will have to be modified. - assertFalse(MapReadHTable.error); - assertEquals(MapReadHTable.count, 1); - - String dropTableQuery = "DROP TABLE " + hbaseTableName ; - CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery); - assertEquals(0, responseThree.getResponseCode()); - - boolean isHbaseTableThere = hAdmin.tableExists(hbaseTableName); - assertFalse(isHbaseTableThere); - - String dropDB = "DROP DATABASE " + databaseName; - CommandProcessorResponse responseFour = hcatDriver.run(dropDB); - assertEquals(0, responseFour.getResponseCode()); - } - - @Test - public void TestHBaseTableProjectionReadMR() throws Exception { - - String tableName = newTableName("MyTable"); - //Table name as specified by hbase.table.name property - String hbaseTableName = "MyDB_" + tableName; - String tableQuery = "CREATE TABLE " + tableName - + "(key string, testqualifier1 string, testqualifier2 string) STORED BY " - + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=" - + "':key,testFamily:testQualifier1,testFamily:testQualifier2')" - + " TBLPROPERTIES ('hbase.table.name'='" + hbaseTableName+ "')" ; - - CommandProcessorResponse responseTwo = hcatDriver.run(tableQuery); - assertEquals(0, responseTwo.getResponseCode()); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists(hbaseTableName); - assertTrue(doesTableExist); - - populateHBaseTable(hbaseTableName, 5); - - Configuration conf = new Configuration(hcatConf); - conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, - HCatUtil.serialize(getHiveConf().getAllProperties())); - - // output settings - Path outputDir = new Path(getTestDir(), "mapred/testHBaseTableProjectionReadMR"); - FileSystem fs = getFileSystem(); - if (fs.exists(outputDir)) { - fs.delete(outputDir, true); - } - // create job - Job job = new Job(conf, "hbase-column-projection"); - job.setJarByClass(this.getClass()); - job.setMapperClass(MapReadProjHTable.class); - job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); - HCatInputFormat.setOutputSchema(job, getProjectionSchema()); - HCatInputFormat.setInput(job, inputJobInfo); - job.setOutputFormatClass(TextOutputFormat.class); - TextOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(Text.class); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(Text.class); - job.setNumReduceTasks(0); - assertTrue(job.waitForCompletion(true)); - assertFalse(MapReadProjHTable.error); - assertEquals(MapReadProjHTable.count, 1); - - String dropTableQuery = "DROP TABLE " + tableName; - CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery); - assertEquals(0, responseThree.getResponseCode()); - - boolean isHbaseTableThere = hAdmin.tableExists(hbaseTableName); - assertFalse(isHbaseTableThere); - } - - - static class MapReadHTable - extends - Mapper, Text> { - - static boolean error = false; - static int count = 0; - - @Override - public void map(ImmutableBytesWritable key, HCatRecord value, - Context context) throws IOException, InterruptedException { - boolean correctValues = (value.size() == 3) - && (value.get(0).toString()).equalsIgnoreCase("testRow") - && (value.get(1).toString()).equalsIgnoreCase("textValue-5") - && (value.get(2).toString()).equalsIgnoreCase("textValue-5"); - - if (correctValues == false) { - error = true; - } - count++; - } - - public static void resetCounters() { - error = false; - count = 0; - } - } - - static class MapReadProjHTable - extends - Mapper, Text> { - - static boolean error = false; - static int count = 0; - @Override - public void map(ImmutableBytesWritable key, HCatRecord value, - Context context) throws IOException, InterruptedException { - boolean correctValues = (value.size() == 2) - && (value.get(0).toString()).equalsIgnoreCase("testRow") - && (value.get(1).toString()).equalsIgnoreCase("textValue-5"); - - if (correctValues == false) { - error = true; - } - count++; - } - } - - private HCatSchema getProjectionSchema() throws HCatException { - - HCatSchema schema = new HCatSchema(new ArrayList()); - schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING, - "")); - schema.append(new HCatFieldSchema("testqualifier1", - HCatFieldSchema.Type.STRING, "")); - return schema; - } - - -} diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java index 13ef338..52ccb02 100644 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java @@ -59,20 +59,20 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration; import org.apache.hcatalog.hbase.snapshot.Transaction; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hive.hcatalog.mapreduce.InputJobInfo; -import org.apache.hive.hcatalog.mapreduce.PartInfo; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.PartInfo; import org.junit.Test; public class TestHCatHBaseInputFormat extends SkeletonHBaseTest { diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHiveHBaseStorageHandler.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHiveHBaseStorageHandler.java deleted file mode 100644 index 19f3143..0000000 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHiveHBaseStorageHandler.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hcatalog.hbase; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.net.URI; -import java.util.Map; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.junit.Test; - -public class TestHiveHBaseStorageHandler extends SkeletonHBaseTest { - - private static HiveConf hcatConf; - private static HCatDriver hcatDriver; - private static Warehouse wh; - - public void Initialize() throws Exception { - - hcatConf = getHiveConf(); - hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, - HCatSemanticAnalyzer.class.getName()); - URI fsuri = getFileSystem().getUri(); - Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(), - getTestDir()); - hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString()); - hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString()); - - //Add hbase properties - for (Map.Entry el : getHbaseConf()) { - if (el.getKey().startsWith("hbase.")) { - hcatConf.set(el.getKey(), el.getValue()); - } - } - - SessionState.start(new CliSessionState(hcatConf)); - hcatDriver = new HCatDriver(); - - } - - @Test - public void testTableCreateDrop() throws Exception { - Initialize(); - - hcatDriver.run("drop table test_table"); - CommandProcessorResponse response = hcatDriver - .run("create table test_table(key int, value string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')"); - - assertEquals(0, response.getResponseCode()); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists("test_table"); - - assertTrue(doesTableExist); - - hcatDriver.run("drop table test_table"); - doesTableExist = hAdmin.tableExists("test_table"); - assertTrue(doesTableExist == false); - - } - public void testHBaseTableCreateDrop() throws Exception { - Initialize(); - - hcatDriver.run("drop table test_table"); - CommandProcessorResponse response = hcatDriver - .run("create table test_table(key int, value string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')"); - - assertEquals(0, response.getResponseCode()); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists("test_table"); - - assertTrue(doesTableExist); - - hcatDriver.run("drop table test_table"); - doesTableExist = hAdmin.tableExists("test_table"); - assertTrue(doesTableExist == false); - - } - - @Test - public void testTableCreateDropDifferentCase() throws Exception { - Initialize(); - - hcatDriver.run("drop table test_Table"); - CommandProcessorResponse response = hcatDriver - .run("create table test_Table(key int, value string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')"); - - assertEquals(0, response.getResponseCode()); - - //HBase table gets created with the specific case - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists("test_table"); - - assertTrue(doesTableExist); - - hcatDriver.run("drop table test_table"); - doesTableExist = hAdmin.tableExists("test_table"); - assertTrue(doesTableExist == false); - } - - @Test - public void testTableCreateDropCaseSensitive() throws Exception { - Initialize(); - - hcatDriver.run("drop table test_Table"); - CommandProcessorResponse response = hcatDriver - .run("create table test_Table(key int, value string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')" + - " TBLPROPERTIES ('hbase.table.name'='CaseSensitiveTable')"); - - assertEquals(0, response.getResponseCode()); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists("CaseSensitiveTable"); - - assertTrue(doesTableExist); - - - hcatDriver.run("drop table test_table"); - doesTableExist = hAdmin.tableExists("CaseSensitiveTable"); - assertTrue(doesTableExist == false); - - } - - @Test - public void testTableDropNonExistent() throws Exception { - Initialize(); - - hcatDriver.run("drop table mytable"); - CommandProcessorResponse response = hcatDriver - .run("create table mytable(key int, value string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')"); - - assertEquals(0, response.getResponseCode()); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists("mytable"); - assertTrue(doesTableExist); - - //Now delete the table from hbase - if (hAdmin.isTableEnabled("mytable")) { - hAdmin.disableTable("mytable"); - } - hAdmin.deleteTable("mytable"); - doesTableExist = hAdmin.tableExists("mytable"); - assertTrue(doesTableExist == false); - - CommandProcessorResponse responseTwo = hcatDriver.run("drop table mytable"); - assertTrue(responseTwo.getResponseCode() != 0); - - } - - @Test - public void testTableCreateExternal() throws Exception { - - String tableName = "testTable"; - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - - HTableDescriptor tableDesc = new HTableDescriptor(tableName); - tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes("key"))); - tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes("familyone"))); - tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes("familytwo"))); - - hAdmin.createTable(tableDesc); - boolean doesTableExist = hAdmin.tableExists(tableName); - assertTrue(doesTableExist); - - hcatDriver.run("drop table mytabletwo"); - CommandProcessorResponse response = hcatDriver - .run("create external table mytabletwo(key int, valueone string, valuetwo string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,familyone:val,familytwo:val') " + - " TBLPROPERTIES ('hbase.table.name'='testTable')"); - - assertEquals(0, response.getResponseCode()); - - } - - -} diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHiveHBaseTableOutputFormat.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHiveHBaseTableOutputFormat.java deleted file mode 100644 index 9dbb251..0000000 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHiveHBaseTableOutputFormat.java +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hcatalog.hbase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.mapred.TableOutputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.hbase.HBaseSerDe; -import org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.apache.hive.hcatalog.common.ErrorType; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.DefaultHCatRecord; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatSchema; -import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; -import org.junit.Test; - -import java.io.IOException; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Test HBaseDirectOUtputFormat and HBaseStorageHandler using a MiniCluster - */ -public class TestHiveHBaseTableOutputFormat extends SkeletonHBaseTest { - - private final HiveConf allConf; - private final HCatDriver hcatDriver; - - public TestHiveHBaseTableOutputFormat() { - allConf = getHiveConf(); - allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, - HCatSemanticAnalyzer.class.getName()); - allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString()); - allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString()); - - //Add hbase properties - for (Map.Entry el : getHbaseConf()) - if (el.getKey().startsWith("hbase.")) { - allConf.set(el.getKey(), el.getValue()); - } - SessionState.start(new CliSessionState(allConf)); - hcatDriver = new HCatDriver(); - } - - @Test - public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException { - String testName = "directOutputFormatTest"; - Path methodTestDir = new Path(getTestDir(),testName); - - String tableName = newTableName(testName).toLowerCase(); - String familyName = "my_family"; - byte[] familyNameBytes = Bytes.toBytes(familyName); - - //include hbase config in conf file - Configuration conf = new Configuration(allConf); - conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties())); - - //create table - createTable(tableName,new String[]{familyName}); - - String data[] = { - "1,english:ONE,spanish:UNO", - "2,english:TWO,spanish:DOS", - "3,english:THREE,spanish:TRES"}; - - // input/output settings - Path inputPath = new Path(methodTestDir,"mr_input"); - getFileSystem().mkdirs(inputPath); - FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); - for(String line: data) - os.write(Bytes.toBytes(line + "\n")); - os.close(); - - //create job - JobConf job = new JobConf(conf); - job.setJobName(testName); - job.setWorkingDirectory(new Path(methodTestDir,"mr_work")); - job.setJarByClass(this.getClass()); - job.setMapperClass(MapWrite.class); - - job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class); - org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath); - // why we need to set all the 3 properties?? - job.setOutputFormat(HiveHBaseTableOutputFormat.class); - job.set(HBaseSerDe.HBASE_TABLE_NAME,tableName); - job.set(TableOutputFormat.OUTPUT_TABLE, tableName); - job.set(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName", tableName); - - try { - OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null); - job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, - HCatUtil.serialize(outputJobInfo)); - } catch (Exception ex) { - throw new IOException("Serialization error " + ex.getMessage(), ex); - } - - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(HCatRecord.class); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(HCatRecord.class); - job.setNumReduceTasks(0); - System.getProperty("java.classpath"); - RunningJob runJob = JobClient.runJob(job); - runJob.waitForCompletion(); - assertTrue(runJob.isSuccessful()); - - //verify - HTable table = new HTable(conf, tableName); - Scan scan = new Scan(); - scan.addFamily(familyNameBytes); - ResultScanner scanner = table.getScanner(scan); - int index=0; - for(Result result: scanner) { - String vals[] = data[index].toString().split(","); - for(int i=1;i mapperClass, - OutputJobInfo outputJobInfo, Path inputPath) throws IOException { - - try { - //now setting the schema - HiveConf hiveConf = HCatUtil.getHiveConf(conf); - HiveMetaStoreClient client = HCatUtil.getHiveClient(hiveConf); - Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName()); - StorageDescriptor tblSD = table.getSd(); - if (tblSD == null) { - throw new HCatException( - "Cannot construct partition info from an empty storage descriptor."); - } - HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(tblSD.getCols())); - outputJobInfo.setOutputSchema(tableSchema); - } - catch(Exception e) { - if( e instanceof HCatException ) { - throw (HCatException) e; - } else { - throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); - } - } - conf.set(HBaseSerDe.HBASE_TABLE_NAME,outputJobInfo.getDatabaseName()+ "." + outputJobInfo.getTableName()); - conf.set(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_NAME,outputJobInfo.getDatabaseName()+ "." + outputJobInfo.getTableName()); - conf.set(TableOutputFormat.OUTPUT_TABLE, outputJobInfo.getDatabaseName() + "."+ outputJobInfo.getTableName()); - conf.set(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName", outputJobInfo.getDatabaseName() + "." + outputJobInfo.getTableName()); - conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,HCatUtil.serialize(outputJobInfo)); - - Job job = new Job(conf, jobName); - job.setWorkingDirectory(workingDir); - job.setJarByClass(this.getClass()); - job.setMapperClass(mapperClass); - - job.setInputFormatClass(TextInputFormat.class); - TextInputFormat.setInputPaths(job, inputPath); - //job.setOutputFormatClass(HiveHBaseTableOutputFormat.class); - job.setOutputFormatClass(HCatOutputFormat.class); - HCatOutputFormat.setOutput(job, outputJobInfo); - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(HCatRecord.class); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(HCatRecord.class); - - job.setNumReduceTasks(0); - return job; - } - - public static class MapHCatWrite extends Mapper { - - @Override - public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - HCatRecord record = new DefaultHCatRecord(3); - HCatSchema schema = jobInfo.getOutputSchema(); - String vals[] = value.toString().split(","); - record.setInteger("key",schema,Integer.parseInt(vals[0])); - for(int i=1;i { - - @Override - public void configure(JobConf job) { - } - - @Override - public void close() throws IOException { - } - - @Override - public void map(LongWritable key, Text value, - OutputCollector output, Reporter reporter) - throws IOException { - String vals[] = value.toString().split(","); - Put put = new Put(Bytes.toBytes(vals[0])); - for(int i=1;i el : getHbaseConf()) { - if (el.getKey().startsWith("hbase.")) { - hcatConf.set(el.getKey(), el.getValue()); - } - } - - driver = new Driver(hcatConf); - SessionState.start(new CliSessionState(hcatConf)); - - } - - private void populateHBaseTable(String tName) throws IOException { - List myPuts = generatePuts(tName); - HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName)); - table.put(myPuts); - } - - private List generatePuts(String tableName) throws IOException { - - List columnFamilies = Arrays.asList("testFamily"); - List myPuts; - myPuts = new ArrayList(); - for (int i = 1; i <=10; i++) { - Put put = new Put(Bytes.toBytes(i)); - put.add(FAMILY, QUALIFIER1, 1, Bytes.toBytes("textA-" + i)); - put.add(FAMILY, QUALIFIER2, 1, Bytes.toBytes("textB-" + i)); - myPuts.add(put); - } - return myPuts; - } - - public static void createTestDataFile(String filename) throws IOException { - FileWriter writer = null; - int LOOP_SIZE = 10; - float f = -100.1f; - try { - File file = new File(filename); - file.deleteOnExit(); - writer = new FileWriter(file); - - for (int i =1; i <= LOOP_SIZE; i++) { - writer.write(i+ "\t" +(f+i)+ "\t" + "textB-" + i + "\n"); - } - } finally { - if (writer != null) { - writer.close(); - } - } - - } - - @Test - public void testPigHBaseSchema() throws Exception { - Initialize(); - - String tableName = newTableName("MyTable"); - String databaseName = newTableName("MyDatabase"); - //Table name will be lower case unless specified by hbase.table.name property - String hbaseTableName = "testTable"; - String db_dir = getTestDir() + "/hbasedb"; - - String dbQuery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" - + db_dir + "'"; - - String deleteQuery = "DROP TABLE "+databaseName+"."+tableName; - - String tableQuery = "CREATE TABLE " + databaseName + "." + tableName - + "(key float, testqualifier1 string, testqualifier2 int) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" - + " TBLPROPERTIES ('hbase.table.name'='"+hbaseTableName+"')"; - - CommandProcessorResponse responseOne = driver.run(deleteQuery); - assertEquals(0, responseOne.getResponseCode()); - - - CommandProcessorResponse responseTwo = driver.run(dbQuery); - assertEquals(0, responseTwo.getResponseCode()); - - - CommandProcessorResponse responseThree = driver.run(tableQuery); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists(hbaseTableName); - assertTrue(doesTableExist); - - PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); - server.registerQuery("A = load '"+databaseName+"."+tableName+"' using org.apache.hive.hcatalog.pig.HCatLoader();"); - - Schema dumpedASchema = server.dumpSchema("A"); - - List fields = dumpedASchema.getFields(); - assertEquals(3, fields.size()); - - assertEquals(DataType.FLOAT,fields.get(0).type); - assertEquals("key",fields.get(0).alias.toLowerCase()); - - assertEquals( DataType.CHARARRAY,fields.get(1).type); - assertEquals("testQualifier1".toLowerCase(), fields.get(1).alias.toLowerCase()); - - assertEquals( DataType.INTEGER,fields.get(2).type); - assertEquals("testQualifier2".toLowerCase(), fields.get(2).alias.toLowerCase()); - - } - - - @Test - public void testPigFilterProjection() throws Exception { - Initialize(); - - String tableName = newTableName("MyTable"); - String databaseName = newTableName("MyDatabase"); - //Table name will be lower case unless specified by hbase.table.name property - String hbaseTableName = (databaseName + "." + tableName).toLowerCase(); - String db_dir = getTestDir() + "/hbasedb"; - - String dbQuery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" - + db_dir + "'"; - - String deleteQuery = "DROP TABLE "+databaseName+"."+tableName; - - String tableQuery = "CREATE TABLE " + databaseName + "." + tableName - + "(key int, testqualifier1 string, testqualifier2 string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + - " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" + - " TBLPROPERTIES ('hbase.table.default.storage.type'='binary')"; - - CommandProcessorResponse responseOne = driver.run(deleteQuery); - assertEquals(0, responseOne.getResponseCode()); - - - CommandProcessorResponse responseTwo = driver.run(dbQuery); - assertEquals(0, responseTwo.getResponseCode()); - - - CommandProcessorResponse responseThree = driver.run(tableQuery); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists(hbaseTableName); - assertTrue(doesTableExist); - - populateHBaseTable(hbaseTableName); - - Configuration conf = new Configuration(getHbaseConf()); - HTable table = new HTable(conf, hbaseTableName); - Scan scan = new Scan(); - scan.addFamily(Bytes.toBytes("testFamily")); - ResultScanner scanner = table.getScanner(scan); - int index=1; - - PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); - server.registerQuery("A = load '"+databaseName+"."+tableName+"' using org.apache.hive.hcatalog.pig.HCatLoader();"); - server.registerQuery("B = filter A by key < 5;"); - server.registerQuery("C = foreach B generate key,testqualifier2;"); - Iterator itr = server.openIterator("C"); - //verify if the filter is correct and returns 2 rows and contains 2 columns and the contents match - while(itr.hasNext()){ - Tuple t = itr.next(); - assertTrue(t.size() == 2); - assertTrue(t.get(0).getClass() == Integer.class); - assertEquals(index,t.get(0)); - assertTrue(t.get(1).getClass() == String.class); - assertEquals("textB-"+index,t.get(1)); - index++; - } - assertEquals(index-1,4); - } - - @Test - public void testPigPopulation() throws Exception { - Initialize(); - - String tableName = newTableName("MyTable"); - String databaseName = newTableName("MyDatabase"); - //Table name will be lower case unless specified by hbase.table.name property - String hbaseTableName = (databaseName + "." + tableName).toLowerCase(); - String db_dir = getTestDir() + "/hbasedb"; - String POPTXT_FILE_NAME = db_dir+"testfile.txt"; - float f = -100.1f; - - String dbQuery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" - + db_dir + "'"; - - String deleteQuery = "DROP TABLE "+databaseName+"."+tableName; - - String tableQuery = "CREATE TABLE " + databaseName + "." + tableName - + "(key int, testqualifier1 float, testqualifier2 string) STORED BY " + - "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" - + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" - + " TBLPROPERTIES ('hbase.table.default.storage.type'='binary')"; - - - String selectQuery = "SELECT * from "+databaseName.toLowerCase()+"."+tableName.toLowerCase(); - - - CommandProcessorResponse responseOne = driver.run(deleteQuery); - assertEquals(0, responseOne.getResponseCode()); - - - CommandProcessorResponse responseTwo = driver.run(dbQuery); - assertEquals(0, responseTwo.getResponseCode()); - - - CommandProcessorResponse responseThree = driver.run(tableQuery); - - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists(hbaseTableName); - assertTrue(doesTableExist); - - - createTestDataFile(POPTXT_FILE_NAME); - - PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); - server.registerQuery("A = load '"+POPTXT_FILE_NAME+"' using PigStorage() as (key:int, testqualifier1:float, testqualifier2:chararray);"); - server.registerQuery("B = filter A by (key > 2) AND (key < 8) ;"); - server.registerQuery("store B into '"+databaseName.toLowerCase()+"."+tableName.toLowerCase()+"' using org.apache.hive.hcatalog.pig.HCatStorer();"); - server.registerQuery("C = load '"+databaseName.toLowerCase()+"."+tableName.toLowerCase()+"' using org.apache.hive.hcatalog.pig.HCatLoader();"); - // Schema should be same - Schema dumpedBSchema = server.dumpSchema("C"); - - List fields = dumpedBSchema.getFields(); - assertEquals(3, fields.size()); - - assertEquals(DataType.INTEGER,fields.get(0).type); - assertEquals("key",fields.get(0).alias.toLowerCase()); - - assertEquals( DataType.FLOAT,fields.get(1).type); - assertEquals("testQualifier1".toLowerCase(), fields.get(1).alias.toLowerCase()); - - assertEquals( DataType.CHARARRAY,fields.get(2).type); - assertEquals("testQualifier2".toLowerCase(), fields.get(2).alias.toLowerCase()); - - //Query the hbase table and check the key is valid and only 5 are present - Configuration conf = new Configuration(getHbaseConf()); - HTable table = new HTable(conf, hbaseTableName); - Scan scan = new Scan(); - scan.addFamily(Bytes.toBytes("testFamily")); - byte[] familyNameBytes = Bytes.toBytes("testFamily"); - ResultScanner scanner = table.getScanner(scan); - int index=3; - int count=0; - for(Result result: scanner) { - //key is correct - assertEquals(index,Bytes.toInt(result.getRow())); - //first column exists - assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes("testQualifier1"))); - //value is correct - assertEquals((index+f),Bytes.toFloat(result.getValue(familyNameBytes,Bytes.toBytes("testQualifier1"))),0); - - //second column exists - assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes("testQualifier2"))); - //value is correct - assertEquals(("textB-"+index).toString(),Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes("testQualifier2")))); - index++; - count++; - } - // 5 rows should be returned - assertEquals(count,5); - - //Check if hive returns results correctly - driver.run(selectQuery); - ArrayList result = new ArrayList(); - driver.getResults(result); - //Query using the hive command line - assertEquals(5, result.size()); - Iterator itr = result.iterator(); - for(int i = 3; i <= 7; i++) { - String tokens[] = itr.next().split("\\s+"); - assertEquals(i,Integer.parseInt(tokens[0])); - assertEquals(i+f,Float.parseFloat(tokens[1]),0); - assertEquals(("textB-"+i).toString(),tokens[2]); - } - - //delete the table from the database - CommandProcessorResponse responseFour = driver.run(deleteQuery); - assertEquals(0, responseFour.getResponseCode()); - - } - -} diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java index 42fb068..ff558f5 100644 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java @@ -34,13 +34,13 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapreduce.Job; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.hbase.snapshot.TableSnapshot; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.InputJobInfo; import org.junit.Test; public class TestSnapshots extends SkeletonHBaseTest { diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java index eaf434d..8122b14 100644 --- a/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java @@ -33,8 +33,8 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hive.hcatalog.cli.HCatDriver; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; import org.apache.hcatalog.hbase.SkeletonHBaseTest; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java new file mode 100644 index 0000000..ab338db --- /dev/null +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; + +/** + * MiniCluster class composed of a number of Hadoop Minicluster implementations + * and other necessary daemons needed for testing (HBase, Hive MetaStore, Zookeeper, MiniMRCluster) + */ +public class ManyMiniCluster { + + //MR stuff + private boolean miniMRClusterEnabled; + private MiniMRCluster mrCluster; + private int numTaskTrackers; + private JobConf jobConf; + + //HBase stuff + private boolean miniHBaseClusterEnabled; + private MiniHBaseCluster hbaseCluster; + private String hbaseRoot; + private Configuration hbaseConf; + private String hbaseDir; + + //ZK Stuff + private boolean miniZookeeperClusterEnabled; + private MiniZooKeeperCluster zookeeperCluster; + private int zookeeperPort; + private String zookeeperDir; + + //DFS Stuff + private MiniDFSCluster dfsCluster; + + //Hive Stuff + private boolean miniHiveMetastoreEnabled; + private HiveConf hiveConf; + private HiveMetaStoreClient hiveMetaStoreClient; + + private final File workDir; + private boolean started = false; + + + /** + * create a cluster instance using a builder which will expose configurable options + * @param workDir working directory ManyMiniCluster will use for all of it's *Minicluster instances + * @return a Builder instance + */ + public static Builder create(File workDir) { + return new Builder(workDir); + } + + private ManyMiniCluster(Builder b) { + workDir = b.workDir; + numTaskTrackers = b.numTaskTrackers; + hiveConf = b.hiveConf; + jobConf = b.jobConf; + hbaseConf = b.hbaseConf; + miniMRClusterEnabled = b.miniMRClusterEnabled; + miniHBaseClusterEnabled = b.miniHBaseClusterEnabled; + miniHiveMetastoreEnabled = b.miniHiveMetastoreEnabled; + miniZookeeperClusterEnabled = b.miniZookeeperClusterEnabled; + } + + protected synchronized void start() { + try { + if (!started) { + FileUtil.fullyDelete(workDir); + if (miniMRClusterEnabled) { + setupMRCluster(); + } + if (miniZookeeperClusterEnabled || miniHBaseClusterEnabled) { + miniZookeeperClusterEnabled = true; + setupZookeeper(); + } + if (miniHBaseClusterEnabled) { + setupHBaseCluster(); + } + if (miniHiveMetastoreEnabled) { + setUpMetastore(); + } + } + } catch (Exception e) { + throw new IllegalStateException("Failed to setup cluster", e); + } + } + + protected synchronized void stop() { + if (hbaseCluster != null) { + HConnectionManager.deleteAllConnections(true); + try { + hbaseCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + hbaseCluster = null; + } + if (zookeeperCluster != null) { + try { + zookeeperCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + zookeeperCluster = null; + } + if (mrCluster != null) { + try { + mrCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + mrCluster = null; + } + if (dfsCluster != null) { + try { + dfsCluster.getFileSystem().close(); + dfsCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + dfsCluster = null; + } + try { + FileSystem.closeAll(); + } catch (IOException e) { + e.printStackTrace(); + } + started = false; + } + + /** + * @return Configuration of mini HBase cluster + */ + public Configuration getHBaseConf() { + return HBaseConfiguration.create(hbaseConf); + } + + /** + * @return Configuration of mini MR cluster + */ + public Configuration getJobConf() { + return new Configuration(jobConf); + } + + /** + * @return Configuration of Hive Metastore, this is a standalone not a daemon + */ + public HiveConf getHiveConf() { + return new HiveConf(hiveConf); + } + + /** + * @return Filesystem used by MiniMRCluster and MiniHBaseCluster + */ + public FileSystem getFileSystem() { + try { + return FileSystem.get(jobConf); + } catch (IOException e) { + throw new IllegalStateException("Failed to get FileSystem", e); + } + } + + /** + * @return Metastore client instance + */ + public HiveMetaStoreClient getHiveMetaStoreClient() { + return hiveMetaStoreClient; + } + + private void setupMRCluster() { + try { + final int jobTrackerPort = findFreePort(); + final int taskTrackerPort = findFreePort(); + + if (jobConf == null) + jobConf = new JobConf(); + + jobConf.setInt("mapred.submit.replication", 1); + jobConf.set("yarn.scheduler.capacity.root.queues", "default"); + jobConf.set("yarn.scheduler.capacity.root.default.capacity", "100"); + //conf.set("hadoop.job.history.location",new File(workDir).getAbsolutePath()+"/history"); + System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath()); + + mrCluster = new MiniMRCluster(jobTrackerPort, + taskTrackerPort, + numTaskTrackers, + getFileSystem().getUri().toString(), + numTaskTrackers, + null, + null, + null, + jobConf); + + jobConf = mrCluster.createJobConf(); + } catch (IOException e) { + throw new IllegalStateException("Failed to Setup MR Cluster", e); + } + } + + private void setupZookeeper() { + try { + zookeeperDir = new File(workDir, "zk").getAbsolutePath(); + zookeeperPort = findFreePort(); + zookeeperCluster = new MiniZooKeeperCluster(); + zookeeperCluster.setDefaultClientPort(zookeeperPort); + zookeeperCluster.startup(new File(zookeeperDir)); + } catch (Exception e) { + throw new IllegalStateException("Failed to Setup Zookeeper Cluster", e); + } + } + + private void setupHBaseCluster() { + final int numRegionServers = 1; + + try { + hbaseDir = new File(workDir, "hbase").toString(); + hbaseDir = hbaseDir.replaceAll("\\\\", "/"); + hbaseRoot = "file://" + hbaseDir; + + if (hbaseConf == null) + hbaseConf = HBaseConfiguration.create(); + + hbaseConf.set("hbase.rootdir", hbaseRoot); + hbaseConf.set("hbase.master", "local"); + hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zookeeperPort); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); + hbaseConf.setInt("hbase.master.port", findFreePort()); + hbaseConf.setInt("hbase.master.info.port", -1); + hbaseConf.setInt("hbase.regionserver.port", findFreePort()); + hbaseConf.setInt("hbase.regionserver.info.port", -1); + + hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers); + hbaseConf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort()); + //opening the META table ensures that cluster is running + new HTable(hbaseConf, HConstants.META_TABLE_NAME); + } catch (Exception e) { + throw new IllegalStateException("Failed to setup HBase Cluster", e); + } + } + + private void setUpMetastore() throws Exception { + if (hiveConf == null) + hiveConf = new HiveConf(this.getClass()); + + //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook + //is present only in the ql/test directory + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, + "jdbc:derby:" + new File(workDir + "/metastore_db") + ";create=true"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), + new File(workDir, "warehouse").toString()); + //set where derby logs + File derbyLogFile = new File(workDir + "/derby.log"); + derbyLogFile.createNewFile(); + System.setProperty("derby.stream.error.file", derbyLogFile.getPath()); + + +// Driver driver = new Driver(hiveConf); +// SessionState.start(new CliSessionState(hiveConf)); + + hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); + } + + private static int findFreePort() throws IOException { + ServerSocket server = new ServerSocket(0); + int port = server.getLocalPort(); + server.close(); + return port; + } + + public static class Builder { + private File workDir; + private int numTaskTrackers = 1; + private JobConf jobConf; + private Configuration hbaseConf; + private HiveConf hiveConf; + + private boolean miniMRClusterEnabled = true; + private boolean miniHBaseClusterEnabled = true; + private boolean miniHiveMetastoreEnabled = true; + private boolean miniZookeeperClusterEnabled = true; + + + private Builder(File workDir) { + this.workDir = workDir; + } + + public Builder numTaskTrackers(int num) { + numTaskTrackers = num; + return this; + } + + public Builder jobConf(JobConf jobConf) { + this.jobConf = jobConf; + return this; + } + + public Builder hbaseConf(Configuration hbaseConf) { + this.hbaseConf = hbaseConf; + return this; + } + + public Builder hiveConf(HiveConf hiveConf) { + this.hiveConf = hiveConf; + return this; + } + + public Builder miniMRClusterEnabled(boolean enabled) { + this.miniMRClusterEnabled = enabled; + return this; + } + + public Builder miniHBaseClusterEnabled(boolean enabled) { + this.miniHBaseClusterEnabled = enabled; + return this; + } + + public Builder miniZookeeperClusterEnabled(boolean enabled) { + this.miniZookeeperClusterEnabled = enabled; + return this; + } + + public Builder miniHiveMetastoreEnabled(boolean enabled) { + this.miniHiveMetastoreEnabled = enabled; + return this; + } + + + public ManyMiniCluster build() { + return new ManyMiniCluster(this); + } + + } +} diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/SkeletonHBaseTest.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/SkeletonHBaseTest.java new file mode 100644 index 0000000..5bbf17d --- /dev/null +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/SkeletonHBaseTest.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.hbase; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * Base class for HBase Tests which need a mini cluster instance + */ +public abstract class SkeletonHBaseTest { + + protected static String TEST_DIR = "/tmp/build/test/data/"; + + protected final static String DEFAULT_CONTEXT_HANDLE = "default"; + + protected static Map contextMap = new HashMap(); + protected static Set tableNames = new HashSet(); + + /** + * Allow tests to alter the default MiniCluster configuration. + * (requires static initializer block as all setup here is static) + */ + protected static Configuration testConf = null; + + protected void createTable(String tableName, String[] families) { + try { + HBaseAdmin admin = new HBaseAdmin(getHbaseConf()); + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + for (String family : families) { + HColumnDescriptor columnDescriptor = new HColumnDescriptor(family); + tableDesc.addFamily(columnDescriptor); + } + admin.createTable(tableDesc); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException(e); + } + + } + + protected String newTableName(String prefix) { + String name = null; + int tries = 100; + do { + name = prefix + "_" + Math.abs(new Random().nextLong()); + } while (tableNames.contains(name) && --tries > 0); + if (tableNames.contains(name)) + throw new IllegalStateException("Couldn't find a unique table name, tableNames size: " + tableNames.size()); + tableNames.add(name); + return name; + } + + + /** + * startup an hbase cluster instance before a test suite runs + */ + @BeforeClass + public static void setup() { + if (!contextMap.containsKey(getContextHandle())) + contextMap.put(getContextHandle(), new Context(getContextHandle())); + + contextMap.get(getContextHandle()).start(); + } + + /** + * shutdown an hbase cluster instance ant the end of the test suite + */ + @AfterClass + public static void tearDown() { + contextMap.get(getContextHandle()).stop(); + } + + /** + * override this with a different context handle if tests suites are run simultaneously + * and ManyMiniCluster instances shouldn't be shared + * @return + */ + public static String getContextHandle() { + return DEFAULT_CONTEXT_HANDLE; + } + + /** + * @return working directory for a given test context, which normally is a test suite + */ + public String getTestDir() { + return contextMap.get(getContextHandle()).getTestDir(); + } + + /** + * @return ManyMiniCluster instance + */ + public ManyMiniCluster getCluster() { + return contextMap.get(getContextHandle()).getCluster(); + } + + /** + * @return configuration of MiniHBaseCluster + */ + public Configuration getHbaseConf() { + return contextMap.get(getContextHandle()).getHbaseConf(); + } + + /** + * @return configuration of MiniMRCluster + */ + public Configuration getJobConf() { + return contextMap.get(getContextHandle()).getJobConf(); + } + + /** + * @return configuration of Hive Metastore + */ + public HiveConf getHiveConf() { + return contextMap.get(getContextHandle()).getHiveConf(); + } + + /** + * @return filesystem used by ManyMiniCluster daemons + */ + public FileSystem getFileSystem() { + return contextMap.get(getContextHandle()).getFileSystem(); + } + + /** + * class used to encapsulate a context which is normally used by + * a single TestSuite or across TestSuites when multi-threaded testing is turned on + */ + public static class Context { + protected String testDir; + protected ManyMiniCluster cluster; + + protected Configuration hbaseConf; + protected Configuration jobConf; + protected HiveConf hiveConf; + + protected FileSystem fileSystem; + + protected int usageCount = 0; + + public Context(String handle) { + try { + testDir = new File(TEST_DIR + "/test_" + handle + "_" + Math.abs(new Random().nextLong()) + "/").getCanonicalPath(); + System.out.println("Cluster work directory: " + testDir); + } catch (IOException e) { + throw new IllegalStateException("Failed to generate testDir", e); + } + } + + public void start() { + if (usageCount++ == 0) { + ManyMiniCluster.Builder b = ManyMiniCluster.create(new File(testDir)); + if (testConf != null) { + b.hbaseConf(HBaseConfiguration.create(testConf)); + } + cluster = b.build(); + cluster.start(); + this.hbaseConf = cluster.getHBaseConf(); + jobConf = cluster.getJobConf(); + fileSystem = cluster.getFileSystem(); + hiveConf = cluster.getHiveConf(); + } + } + + public void stop() { + if (--usageCount == 0) { + try { + cluster.stop(); + cluster = null; + } finally { + System.out.println("Trying to cleanup: " + testDir); + try { + FileSystem fs = FileSystem.get(jobConf); + fs.delete(new Path(testDir), true); + } catch (IOException e) { + throw new IllegalStateException("Failed to cleanup test dir", e); + } + + } + } + } + + public String getTestDir() { + return testDir; + } + + public ManyMiniCluster getCluster() { + return cluster; + } + + public Configuration getHbaseConf() { + return hbaseConf; + } + + public Configuration getJobConf() { + return jobConf; + } + + public HiveConf getHiveConf() { + return hiveConf; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + } + +} diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHBaseInputFormat.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHBaseInputFormat.java new file mode 100644 index 0000000..5cbe86d --- /dev/null +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHBaseInputFormat.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hive.hcatalog.cli.HCatDriver; +import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.junit.Test; + +public class TestHBaseInputFormat extends SkeletonHBaseTest { + + private static HiveConf hcatConf; + private static HCatDriver hcatDriver; + private final byte[] FAMILY = Bytes.toBytes("testFamily"); + private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); + private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2"); + + public TestHBaseInputFormat() throws Exception { + hcatConf = getHiveConf(); + hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + URI fsuri = getFileSystem().getUri(); + Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(), + getTestDir()); + hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString()); + hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString()); + + //Add hbase properties + + for (Map.Entry el : getHbaseConf()) { + if (el.getKey().startsWith("hbase.")) { + hcatConf.set(el.getKey(), el.getValue()); + } + } + + SessionState.start(new CliSessionState(hcatConf)); + hcatDriver = new HCatDriver(); + + } + + private List generatePuts(int num, String tableName) throws IOException { + + List columnFamilies = Arrays.asList("testFamily"); + List myPuts; + myPuts = new ArrayList(); + for (int i = 1; i <= num; i++) { + Put put = new Put(Bytes.toBytes("testRow")); + put.add(FAMILY, QUALIFIER1, i, Bytes.toBytes("textValue-" + i)); + put.add(FAMILY, QUALIFIER2, i, Bytes.toBytes("textValue-" + i)); + myPuts.add(put); + } + return myPuts; + } + + private void populateHBaseTable(String tName, int revisions) throws IOException { + List myPuts = generatePuts(revisions, tName); + HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName)); + table.put(myPuts); + } + + @Test + public void TestHBaseTableReadMR() throws Exception { + String tableName = newTableName("MyTable"); + String databaseName = newTableName("MyDatabase"); + //Table name will be lower case unless specified by hbase.table.name property + String hbaseTableName = (databaseName + "." + tableName).toLowerCase(); + String db_dir = new Path(getTestDir(), "hbasedb").toString(); + + String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + + db_dir + "'"; + String tableQuery = "CREATE TABLE " + databaseName + "." + tableName + + "(key string, testqualifier1 string, testqualifier2 string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" ; + + CommandProcessorResponse responseOne = hcatDriver.run(dbquery); + assertEquals(0, responseOne.getResponseCode()); + CommandProcessorResponse responseTwo = hcatDriver.run(tableQuery); + assertEquals(0, responseTwo.getResponseCode()); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists(hbaseTableName); + assertTrue(doesTableExist); + + populateHBaseTable(hbaseTableName, 5); + Configuration conf = new Configuration(hcatConf); + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(getHiveConf().getAllProperties())); + + conf.set(HBaseSerDe.HBASE_TABLE_NAME,hbaseTableName); + conf.set(TableInputFormat.INPUT_TABLE, hbaseTableName); + // output settings + Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead"); + FileSystem fs = getFileSystem(); + if (fs.exists(outputDir)) { + fs.delete(outputDir, true); + } + // create job + Job job = new Job(conf, "hbase-mr-read-test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapReadHTable.class); + MapReadHTable.resetCounters(); + + job.setInputFormatClass(HCatInputFormat.class); + InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, tableName, + null); + HCatInputFormat.setInput(job, inputJobInfo); + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(0); + assertTrue(job.waitForCompletion(true)); + // Note: These asserts only works in case of LocalJobRunner as they run in same jvm. + // If using MiniMRCluster, the tests will have to be modified. + assertFalse(MapReadHTable.error); + assertEquals(MapReadHTable.count, 1); + + String dropTableQuery = "DROP TABLE " + hbaseTableName ; + CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery); + assertEquals(0, responseThree.getResponseCode()); + + boolean isHbaseTableThere = hAdmin.tableExists(hbaseTableName); + assertFalse(isHbaseTableThere); + + String dropDB = "DROP DATABASE " + databaseName; + CommandProcessorResponse responseFour = hcatDriver.run(dropDB); + assertEquals(0, responseFour.getResponseCode()); + } + + @Test + public void TestHBaseTableProjectionReadMR() throws Exception { + + String tableName = newTableName("MyTable"); + //Table name as specified by hbase.table.name property + String hbaseTableName = "MyDB_" + tableName; + String tableQuery = "CREATE TABLE " + tableName + + "(key string, testqualifier1 string, testqualifier2 string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=" + + "':key,testFamily:testQualifier1,testFamily:testQualifier2')" + + " TBLPROPERTIES ('hbase.table.name'='" + hbaseTableName+ "')" ; + + CommandProcessorResponse responseTwo = hcatDriver.run(tableQuery); + assertEquals(0, responseTwo.getResponseCode()); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists(hbaseTableName); + assertTrue(doesTableExist); + + populateHBaseTable(hbaseTableName, 5); + + Configuration conf = new Configuration(hcatConf); + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(getHiveConf().getAllProperties())); + + // output settings + Path outputDir = new Path(getTestDir(), "mapred/testHBaseTableProjectionReadMR"); + FileSystem fs = getFileSystem(); + if (fs.exists(outputDir)) { + fs.delete(outputDir, true); + } + // create job + Job job = new Job(conf, "hbase-column-projection"); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapReadProjHTable.class); + job.setInputFormatClass(HCatInputFormat.class); + InputJobInfo inputJobInfo = InputJobInfo.create( + MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); + HCatInputFormat.setOutputSchema(job, getProjectionSchema()); + HCatInputFormat.setInput(job, inputJobInfo); + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(0); + assertTrue(job.waitForCompletion(true)); + assertFalse(MapReadProjHTable.error); + assertEquals(MapReadProjHTable.count, 1); + + String dropTableQuery = "DROP TABLE " + tableName; + CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery); + assertEquals(0, responseThree.getResponseCode()); + + boolean isHbaseTableThere = hAdmin.tableExists(hbaseTableName); + assertFalse(isHbaseTableThere); + } + + + static class MapReadHTable + extends + Mapper, Text> { + + static boolean error = false; + static int count = 0; + + @Override + public void map(ImmutableBytesWritable key, HCatRecord value, + Context context) throws IOException, InterruptedException { + boolean correctValues = (value.size() == 3) + && (value.get(0).toString()).equalsIgnoreCase("testRow") + && (value.get(1).toString()).equalsIgnoreCase("textValue-5") + && (value.get(2).toString()).equalsIgnoreCase("textValue-5"); + + if (correctValues == false) { + error = true; + } + count++; + } + + public static void resetCounters() { + error = false; + count = 0; + } + } + + static class MapReadProjHTable + extends + Mapper, Text> { + + static boolean error = false; + static int count = 0; + @Override + public void map(ImmutableBytesWritable key, HCatRecord value, + Context context) throws IOException, InterruptedException { + boolean correctValues = (value.size() == 2) + && (value.get(0).toString()).equalsIgnoreCase("testRow") + && (value.get(1).toString()).equalsIgnoreCase("textValue-5"); + + if (correctValues == false) { + error = true; + } + count++; + } + } + + private HCatSchema getProjectionSchema() throws HCatException { + + HCatSchema schema = new HCatSchema(new ArrayList()); + schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING, + "")); + schema.append(new HCatFieldSchema("testqualifier1", + HCatFieldSchema.Type.STRING, "")); + return schema; + } + + +} diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHiveHBaseStorageHandler.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHiveHBaseStorageHandler.java new file mode 100644 index 0000000..6882c54 --- /dev/null +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHiveHBaseStorageHandler.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.cli.HCatDriver; +import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.junit.Test; + +public class TestHiveHBaseStorageHandler extends SkeletonHBaseTest { + + private static HiveConf hcatConf; + private static HCatDriver hcatDriver; + private static Warehouse wh; + + public void Initialize() throws Exception { + + hcatConf = getHiveConf(); + hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + URI fsuri = getFileSystem().getUri(); + Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(), + getTestDir()); + hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString()); + hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString()); + + //Add hbase properties + for (Map.Entry el : getHbaseConf()) { + if (el.getKey().startsWith("hbase.")) { + hcatConf.set(el.getKey(), el.getValue()); + } + } + + SessionState.start(new CliSessionState(hcatConf)); + hcatDriver = new HCatDriver(); + + } + + @Test + public void testTableCreateDrop() throws Exception { + Initialize(); + + hcatDriver.run("drop table test_table"); + CommandProcessorResponse response = hcatDriver + .run("create table test_table(key int, value string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')"); + + assertEquals(0, response.getResponseCode()); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists("test_table"); + + assertTrue(doesTableExist); + + hcatDriver.run("drop table test_table"); + doesTableExist = hAdmin.tableExists("test_table"); + assertTrue(doesTableExist == false); + + } + public void testHBaseTableCreateDrop() throws Exception { + Initialize(); + + hcatDriver.run("drop table test_table"); + CommandProcessorResponse response = hcatDriver + .run("create table test_table(key int, value string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')"); + + assertEquals(0, response.getResponseCode()); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists("test_table"); + + assertTrue(doesTableExist); + + hcatDriver.run("drop table test_table"); + doesTableExist = hAdmin.tableExists("test_table"); + assertTrue(doesTableExist == false); + + } + + @Test + public void testTableCreateDropDifferentCase() throws Exception { + Initialize(); + + hcatDriver.run("drop table test_Table"); + CommandProcessorResponse response = hcatDriver + .run("create table test_Table(key int, value string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')"); + + assertEquals(0, response.getResponseCode()); + + //HBase table gets created with the specific case + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists("test_table"); + + assertTrue(doesTableExist); + + hcatDriver.run("drop table test_table"); + doesTableExist = hAdmin.tableExists("test_table"); + assertTrue(doesTableExist == false); + } + + @Test + public void testTableCreateDropCaseSensitive() throws Exception { + Initialize(); + + hcatDriver.run("drop table test_Table"); + CommandProcessorResponse response = hcatDriver + .run("create table test_Table(key int, value string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')" + + " TBLPROPERTIES ('hbase.table.name'='CaseSensitiveTable')"); + + assertEquals(0, response.getResponseCode()); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists("CaseSensitiveTable"); + + assertTrue(doesTableExist); + + + hcatDriver.run("drop table test_table"); + doesTableExist = hAdmin.tableExists("CaseSensitiveTable"); + assertTrue(doesTableExist == false); + + } + + @Test + public void testTableDropNonExistent() throws Exception { + Initialize(); + + hcatDriver.run("drop table mytable"); + CommandProcessorResponse response = hcatDriver + .run("create table mytable(key int, value string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')"); + + assertEquals(0, response.getResponseCode()); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists("mytable"); + assertTrue(doesTableExist); + + //Now delete the table from hbase + if (hAdmin.isTableEnabled("mytable")) { + hAdmin.disableTable("mytable"); + } + hAdmin.deleteTable("mytable"); + doesTableExist = hAdmin.tableExists("mytable"); + assertTrue(doesTableExist == false); + + CommandProcessorResponse responseTwo = hcatDriver.run("drop table mytable"); + assertTrue(responseTwo.getResponseCode() != 0); + + } + + @Test + public void testTableCreateExternal() throws Exception { + + String tableName = "testTable"; + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes("key"))); + tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes("familyone"))); + tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes("familytwo"))); + + hAdmin.createTable(tableDesc); + boolean doesTableExist = hAdmin.tableExists(tableName); + assertTrue(doesTableExist); + + hcatDriver.run("drop table mytabletwo"); + CommandProcessorResponse response = hcatDriver + .run("create external table mytabletwo(key int, valueone string, valuetwo string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,familyone:val,familytwo:val') " + + " TBLPROPERTIES ('hbase.table.name'='testTable')"); + + assertEquals(0, response.getResponseCode()); + + } + + +} diff --git a/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHiveHBaseTableOutputFormat.java b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHiveHBaseTableOutputFormat.java new file mode 100644 index 0000000..dde8d7d --- /dev/null +++ b/hcatalog/storage-handlers/hbase/src/test/org/apache/hive/hcatalog/hbase/TestHiveHBaseTableOutputFormat.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hive.hcatalog.cli.HCatDriver; +import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hive.hcatalog.common.ErrorType; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test HBaseDirectOUtputFormat and HBaseStorageHandler using a MiniCluster + */ +public class TestHiveHBaseTableOutputFormat extends SkeletonHBaseTest { + + private final HiveConf allConf; + private final HCatDriver hcatDriver; + + public TestHiveHBaseTableOutputFormat() { + allConf = getHiveConf(); + allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString()); + allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString()); + + //Add hbase properties + for (Map.Entry el : getHbaseConf()) + if (el.getKey().startsWith("hbase.")) { + allConf.set(el.getKey(), el.getValue()); + } + SessionState.start(new CliSessionState(allConf)); + hcatDriver = new HCatDriver(); + } + + @Test + public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException { + String testName = "directOutputFormatTest"; + Path methodTestDir = new Path(getTestDir(),testName); + + String tableName = newTableName(testName).toLowerCase(); + String familyName = "my_family"; + byte[] familyNameBytes = Bytes.toBytes(familyName); + + //include hbase config in conf file + Configuration conf = new Configuration(allConf); + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties())); + + //create table + createTable(tableName,new String[]{familyName}); + + String data[] = { + "1,english:ONE,spanish:UNO", + "2,english:TWO,spanish:DOS", + "3,english:THREE,spanish:TRES"}; + + // input/output settings + Path inputPath = new Path(methodTestDir,"mr_input"); + getFileSystem().mkdirs(inputPath); + FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); + for(String line: data) + os.write(Bytes.toBytes(line + "\n")); + os.close(); + + //create job + JobConf job = new JobConf(conf); + job.setJobName(testName); + job.setWorkingDirectory(new Path(methodTestDir,"mr_work")); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapWrite.class); + + job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class); + org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath); + // why we need to set all the 3 properties?? + job.setOutputFormat(HiveHBaseTableOutputFormat.class); + job.set(HBaseSerDe.HBASE_TABLE_NAME,tableName); + job.set(TableOutputFormat.OUTPUT_TABLE, tableName); + job.set(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName", tableName); + + try { + OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null); + job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, + HCatUtil.serialize(outputJobInfo)); + } catch (Exception ex) { + throw new IOException("Serialization error " + ex.getMessage(), ex); + } + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(HCatRecord.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(HCatRecord.class); + job.setNumReduceTasks(0); + System.getProperty("java.classpath"); + RunningJob runJob = JobClient.runJob(job); + runJob.waitForCompletion(); + assertTrue(runJob.isSuccessful()); + + //verify + HTable table = new HTable(conf, tableName); + Scan scan = new Scan(); + scan.addFamily(familyNameBytes); + ResultScanner scanner = table.getScanner(scan); + int index=0; + for(Result result: scanner) { + String vals[] = data[index].toString().split(","); + for(int i=1;i mapperClass, + OutputJobInfo outputJobInfo, Path inputPath) throws IOException { + + try { + //now setting the schema + HiveConf hiveConf = HCatUtil.getHiveConf(conf); + HiveMetaStoreClient client = HCatUtil.getHiveClient(hiveConf); + Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName()); + StorageDescriptor tblSD = table.getSd(); + if (tblSD == null) { + throw new HCatException( + "Cannot construct partition info from an empty storage descriptor."); + } + HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(tblSD.getCols())); + outputJobInfo.setOutputSchema(tableSchema); + } + catch(Exception e) { + if( e instanceof HCatException ) { + throw (HCatException) e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } + conf.set(HBaseSerDe.HBASE_TABLE_NAME,outputJobInfo.getDatabaseName()+ "." + outputJobInfo.getTableName()); + conf.set(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_NAME,outputJobInfo.getDatabaseName()+ "." + outputJobInfo.getTableName()); + conf.set(TableOutputFormat.OUTPUT_TABLE, outputJobInfo.getDatabaseName() + "."+ outputJobInfo.getTableName()); + conf.set(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName", outputJobInfo.getDatabaseName() + "." + outputJobInfo.getTableName()); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,HCatUtil.serialize(outputJobInfo)); + + Job job = new Job(conf, jobName); + job.setWorkingDirectory(workingDir); + job.setJarByClass(this.getClass()); + job.setMapperClass(mapperClass); + + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, inputPath); + //job.setOutputFormatClass(HiveHBaseTableOutputFormat.class); + job.setOutputFormatClass(HCatOutputFormat.class); + HCatOutputFormat.setOutput(job, outputJobInfo); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(HCatRecord.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(HCatRecord.class); + + job.setNumReduceTasks(0); + return job; + } + + public static class MapHCatWrite extends Mapper { + + @Override + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + HCatRecord record = new DefaultHCatRecord(3); + HCatSchema schema = jobInfo.getOutputSchema(); + String vals[] = value.toString().split(","); + record.setInteger("key",schema,Integer.parseInt(vals[0])); + for(int i=1;i { + + @Override + public void configure(JobConf job) { + } + + @Override + public void close() throws IOException { + } + + @Override + public void map(LongWritable key, Text value, + OutputCollector output, Reporter reporter) + throws IOException { + String vals[] = value.toString().split(","); + Put put = new Put(Bytes.toBytes(vals[0])); + for(int i=1;i el : getHbaseConf()) { + if (el.getKey().startsWith("hbase.")) { + hcatConf.set(el.getKey(), el.getValue()); + } + } + + driver = new Driver(hcatConf); + SessionState.start(new CliSessionState(hcatConf)); + + } + + private void populateHBaseTable(String tName) throws IOException { + List myPuts = generatePuts(tName); + HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName)); + table.put(myPuts); + } + + private List generatePuts(String tableName) throws IOException { + + List columnFamilies = Arrays.asList("testFamily"); + List myPuts; + myPuts = new ArrayList(); + for (int i = 1; i <=10; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(FAMILY, QUALIFIER1, 1, Bytes.toBytes("textA-" + i)); + put.add(FAMILY, QUALIFIER2, 1, Bytes.toBytes("textB-" + i)); + myPuts.add(put); + } + return myPuts; + } + + public static void createTestDataFile(String filename) throws IOException { + FileWriter writer = null; + int LOOP_SIZE = 10; + float f = -100.1f; + try { + File file = new File(filename); + file.deleteOnExit(); + writer = new FileWriter(file); + + for (int i =1; i <= LOOP_SIZE; i++) { + writer.write(i+ "\t" +(f+i)+ "\t" + "textB-" + i + "\n"); + } + } finally { + if (writer != null) { + writer.close(); + } + } + + } + + @Test + public void testPigHBaseSchema() throws Exception { + Initialize(); + + String tableName = newTableName("MyTable"); + String databaseName = newTableName("MyDatabase"); + //Table name will be lower case unless specified by hbase.table.name property + String hbaseTableName = "testTable"; + String db_dir = getTestDir() + "/hbasedb"; + + String dbQuery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + + db_dir + "'"; + + String deleteQuery = "DROP TABLE "+databaseName+"."+tableName; + + String tableQuery = "CREATE TABLE " + databaseName + "." + tableName + + "(key float, testqualifier1 string, testqualifier2 int) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" + + " TBLPROPERTIES ('hbase.table.name'='"+hbaseTableName+"')"; + + CommandProcessorResponse responseOne = driver.run(deleteQuery); + assertEquals(0, responseOne.getResponseCode()); + + + CommandProcessorResponse responseTwo = driver.run(dbQuery); + assertEquals(0, responseTwo.getResponseCode()); + + + CommandProcessorResponse responseThree = driver.run(tableQuery); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists(hbaseTableName); + assertTrue(doesTableExist); + + PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); + server.registerQuery("A = load '"+databaseName+"."+tableName+"' using org.apache.hive.hcatalog.pig.HCatLoader();"); + + Schema dumpedASchema = server.dumpSchema("A"); + + List fields = dumpedASchema.getFields(); + assertEquals(3, fields.size()); + + assertEquals(DataType.FLOAT,fields.get(0).type); + assertEquals("key",fields.get(0).alias.toLowerCase()); + + assertEquals( DataType.CHARARRAY,fields.get(1).type); + assertEquals("testQualifier1".toLowerCase(), fields.get(1).alias.toLowerCase()); + + assertEquals( DataType.INTEGER,fields.get(2).type); + assertEquals("testQualifier2".toLowerCase(), fields.get(2).alias.toLowerCase()); + + } + + + @Test + public void testPigFilterProjection() throws Exception { + Initialize(); + + String tableName = newTableName("MyTable"); + String databaseName = newTableName("MyDatabase"); + //Table name will be lower case unless specified by hbase.table.name property + String hbaseTableName = (databaseName + "." + tableName).toLowerCase(); + String db_dir = getTestDir() + "/hbasedb"; + + String dbQuery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + + db_dir + "'"; + + String deleteQuery = "DROP TABLE "+databaseName+"."+tableName; + + String tableQuery = "CREATE TABLE " + databaseName + "." + tableName + + "(key int, testqualifier1 string, testqualifier2 string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" + + " TBLPROPERTIES ('hbase.table.default.storage.type'='binary')"; + + CommandProcessorResponse responseOne = driver.run(deleteQuery); + assertEquals(0, responseOne.getResponseCode()); + + + CommandProcessorResponse responseTwo = driver.run(dbQuery); + assertEquals(0, responseTwo.getResponseCode()); + + + CommandProcessorResponse responseThree = driver.run(tableQuery); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists(hbaseTableName); + assertTrue(doesTableExist); + + populateHBaseTable(hbaseTableName); + + Configuration conf = new Configuration(getHbaseConf()); + HTable table = new HTable(conf, hbaseTableName); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("testFamily")); + ResultScanner scanner = table.getScanner(scan); + int index=1; + + PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); + server.registerQuery("A = load '"+databaseName+"."+tableName+"' using org.apache.hive.hcatalog.pig.HCatLoader();"); + server.registerQuery("B = filter A by key < 5;"); + server.registerQuery("C = foreach B generate key,testqualifier2;"); + Iterator itr = server.openIterator("C"); + //verify if the filter is correct and returns 2 rows and contains 2 columns and the contents match + while(itr.hasNext()){ + Tuple t = itr.next(); + assertTrue(t.size() == 2); + assertTrue(t.get(0).getClass() == Integer.class); + assertEquals(index,t.get(0)); + assertTrue(t.get(1).getClass() == String.class); + assertEquals("textB-"+index,t.get(1)); + index++; + } + assertEquals(index-1,4); + } + + @Test + public void testPigPopulation() throws Exception { + Initialize(); + + String tableName = newTableName("MyTable"); + String databaseName = newTableName("MyDatabase"); + //Table name will be lower case unless specified by hbase.table.name property + String hbaseTableName = (databaseName + "." + tableName).toLowerCase(); + String db_dir = getTestDir() + "/hbasedb"; + String POPTXT_FILE_NAME = db_dir+"testfile.txt"; + float f = -100.1f; + + String dbQuery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + + db_dir + "'"; + + String deleteQuery = "DROP TABLE "+databaseName+"."+tableName; + + String tableQuery = "CREATE TABLE " + databaseName + "." + tableName + + "(key int, testqualifier1 float, testqualifier2 string) STORED BY " + + "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" + + " TBLPROPERTIES ('hbase.table.default.storage.type'='binary')"; + + + String selectQuery = "SELECT * from "+databaseName.toLowerCase()+"."+tableName.toLowerCase(); + + + CommandProcessorResponse responseOne = driver.run(deleteQuery); + assertEquals(0, responseOne.getResponseCode()); + + + CommandProcessorResponse responseTwo = driver.run(dbQuery); + assertEquals(0, responseTwo.getResponseCode()); + + + CommandProcessorResponse responseThree = driver.run(tableQuery); + + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); + boolean doesTableExist = hAdmin.tableExists(hbaseTableName); + assertTrue(doesTableExist); + + + createTestDataFile(POPTXT_FILE_NAME); + + PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); + server.registerQuery("A = load '"+POPTXT_FILE_NAME+"' using PigStorage() as (key:int, testqualifier1:float, testqualifier2:chararray);"); + server.registerQuery("B = filter A by (key > 2) AND (key < 8) ;"); + server.registerQuery("store B into '"+databaseName.toLowerCase()+"."+tableName.toLowerCase()+"' using org.apache.hive.hcatalog.pig.HCatStorer();"); + server.registerQuery("C = load '"+databaseName.toLowerCase()+"."+tableName.toLowerCase()+"' using org.apache.hive.hcatalog.pig.HCatLoader();"); + // Schema should be same + Schema dumpedBSchema = server.dumpSchema("C"); + + List fields = dumpedBSchema.getFields(); + assertEquals(3, fields.size()); + + assertEquals(DataType.INTEGER,fields.get(0).type); + assertEquals("key",fields.get(0).alias.toLowerCase()); + + assertEquals( DataType.FLOAT,fields.get(1).type); + assertEquals("testQualifier1".toLowerCase(), fields.get(1).alias.toLowerCase()); + + assertEquals( DataType.CHARARRAY,fields.get(2).type); + assertEquals("testQualifier2".toLowerCase(), fields.get(2).alias.toLowerCase()); + + //Query the hbase table and check the key is valid and only 5 are present + Configuration conf = new Configuration(getHbaseConf()); + HTable table = new HTable(conf, hbaseTableName); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("testFamily")); + byte[] familyNameBytes = Bytes.toBytes("testFamily"); + ResultScanner scanner = table.getScanner(scan); + int index=3; + int count=0; + for(Result result: scanner) { + //key is correct + assertEquals(index,Bytes.toInt(result.getRow())); + //first column exists + assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes("testQualifier1"))); + //value is correct + assertEquals((index+f),Bytes.toFloat(result.getValue(familyNameBytes,Bytes.toBytes("testQualifier1"))),0); + + //second column exists + assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes("testQualifier2"))); + //value is correct + assertEquals(("textB-"+index).toString(),Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes("testQualifier2")))); + index++; + count++; + } + // 5 rows should be returned + assertEquals(count,5); + + //Check if hive returns results correctly + driver.run(selectQuery); + ArrayList result = new ArrayList(); + driver.getResults(result); + //Query using the hive command line + assertEquals(5, result.size()); + Iterator itr = result.iterator(); + for(int i = 3; i <= 7; i++) { + String tokens[] = itr.next().split("\\s+"); + assertEquals(i,Integer.parseInt(tokens[0])); + assertEquals(i+f,Float.parseFloat(tokens[1]),0); + assertEquals(("textB-"+i).toString(),tokens[2]); + } + + //delete the table from the database + CommandProcessorResponse responseFour = driver.run(deleteQuery); + assertEquals(0, responseFour.getResponseCode()); + + } + +}