diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java new file mode 100644 index 0000000..706c911 --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + + +import java.io.IOException; + +/** + * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put API to write each row to HBase one a + * time. Presently it is just using TableOutputFormat as the underlying implementation in the future we can + * tune this to make the writes faster such as permanently disabling WAL, caching, etc. + */ +class HBaseDirectOutputFormat extends OutputFormat,Writable> implements Configurable { + + private TableOutputFormat> outputFormat; + + public HBaseDirectOutputFormat() { + this.outputFormat = new TableOutputFormat>(); + } + + @Override + public RecordWriter, Writable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + return outputFormat.getRecordWriter(context); + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + outputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return outputFormat.getOutputCommitter(context); + } + + @Override + public void setConf(Configuration conf) { + String tableName = conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY); + conf = new Configuration(conf); + conf.set(TableOutputFormat.OUTPUT_TABLE,tableName); + outputFormat.setConf(conf); + } + + @Override + public Configuration getConf() { + return outputFormat.getConf(); + } +} diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java new file mode 100644 index 0000000..241866d --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; +import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.OutputJobInfo; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * HBase Storage driver implementation which uses "direct" writes to hbase for writing out records. + */ +public class HBaseDirectOutputStorageDriver extends HCatOutputStorageDriver { + private HCatTableInfo tableInfo; + private HBaseDirectOutputFormat outputFormat; + private ResultConverter converter; + private OutputJobInfo outputJobInfo; + private HCatSchema schema; + private HCatSchema outputSchema; + + @Override + public void initialize(JobContext context, Properties hcatProperties) throws IOException { + super.initialize(context, hcatProperties); + String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + if( jobString == null ) { + throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput() not called?"); + } + outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString); + tableInfo = outputJobInfo.getTableInfo(); + schema = tableInfo.getDataColumns(); + + List fields = HCatUtil.getFieldSchemaList(outputSchema.getFields()); + hcatProperties.setProperty(Constants.LIST_COLUMNS, + MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); + hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES, + MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); + + //override table properties with user defined ones + //in the future we should be more selective on what to override + hcatProperties.putAll(outputJobInfo.getProperties()); + //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize being called + converter = new HBaseSerDeResultConverter(schema, + outputSchema, + hcatProperties); + context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName()); + outputFormat = new HBaseDirectOutputFormat(); + outputFormat.setConf(context.getConfiguration()); + } + + @Override + public OutputFormat, ? super Writable> getOutputFormat() throws IOException { + return outputFormat; + } + + @Override + public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException { + this.outputSchema = schema; + } + + @Override + public WritableComparable generateKey(HCatRecord value) throws IOException { + //HBase doesn't use KEY as part of output + return null; + } + + @Override + public Writable convertValue(HCatRecord value) throws IOException { + return converter.convert(value); + } + + @Override + public void setPartitionValues(JobContext jobContext, Map partitionValues) throws IOException { + //no partitions for this driver + } + + @Override + public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException { + return null; + } + + @Override + public void setOutputPath(JobContext jobContext, String location) throws IOException { + //no output path + } + + @Override + public String getOutputLocation(JobContext jobContext, String tableLocation, List partitionCols, Map partitionValues, String dynHash) throws IOException { + return null; + } +} diff --git storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java new file mode 100644 index 0000000..ed130c2 --- /dev/null +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; + +/** + * MiniCluster class composed of a number of Hadoop Minicluster implementations + * and other necessary daemons needed for testing (HBase, Hive MetaStore, Zookeeper, MiniMRCluster) + */ +public class ManyMiniCluster { + + private MiniHBaseCluster hbaseCluster; + private MiniZooKeeperCluster zookeeperCluster; + private MiniDFSCluster dfsCluster; + private MiniMRCluster mrCluster; + private int zookeeperPort; + private String hbaseRoot; + + private Configuration hbaseConf; + private Configuration jobConf; + private HiveConf hiveConf; + + private String zookeeperDir; + private String hbaseDir; + private final String workDir; + + private boolean started = false; + + private HiveMetaStoreClient hiveMetaStoreClient; + + /** + * @param workDir base working directory for the daemons to use + */ + public ManyMiniCluster(String workDir) { + try { + File file = new File(workDir,"ManyMiniCluster/"); + file.mkdirs(); + this.workDir = file.getCanonicalPath(); + + } catch (IOException e) { + throw new IllegalStateException("Failed to generate work dir",e); + } + } + + protected synchronized void start() { + try { + if (!started) { + FileUtil.fullyDelete(new File(workDir)); + setupMRCluster(); + setupZookeeper(); + setupHBaseCluster(); + setUpMetastore(); + } + } catch(Exception e) { + throw new IllegalStateException("Failed to setup cluster",e); + } + } + + protected synchronized void stop() { + if (hbaseCluster != null) { + HConnectionManager.deleteAllConnections(true); + try { + hbaseCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + hbaseCluster = null; + } + if (zookeeperCluster != null) { + try { + zookeeperCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + zookeeperCluster = null; + } + if(mrCluster != null) { + try { + mrCluster.shutdown(); + } catch(Exception e) { + e.printStackTrace(); + } + mrCluster = null; + } + if(dfsCluster != null) { + try { + dfsCluster.shutdown(); + } catch(Exception e) { + e.printStackTrace(); + } + dfsCluster = null; + } + started = false; + } + + /** + * @return Configuration of mini HBase cluster + */ + public Configuration getHBaseConf() { + return HBaseConfiguration.create(hbaseConf); + } + + /** + * @return Configuration of mini MR cluster + */ + public Configuration getJobConf() { + return new Configuration(jobConf); + } + + /** + * @return Configuration of Hive Metastore, this is a standalone not a daemon + */ + public HiveConf getHiveConf() { + return new HiveConf(hiveConf); + } + + /** + * @return Filesystem used by MiniMRCluster and MiniHBaseCluster + */ + public FileSystem getFileSystem() { + try { + return FileSystem.get(jobConf); + } catch (IOException e) { + throw new IllegalStateException("Failed to get FileSystem",e); + } + } + + /** + * @return Metastore client instance + */ + public HiveMetaStoreClient getHiveMetaStoreClient() { + return hiveMetaStoreClient; + } + + private void setupMRCluster() { + try { + final int taskTrackers = 1; // There will be 4 task tracker nodes + final int jobTrackerPort = findFreePort(); + final int taskTrackerPort = findFreePort(); + + JobConf conf = new JobConf(); + conf.setInt("mapred.submit.replication", 1); + //conf.set("hadoop.job.history.location",new File(workDir).getAbsolutePath()+"/history"); + System.setProperty("hadoop.log.dir",new File(workDir).getAbsolutePath()+"/logs"); + + mrCluster = new MiniMRCluster(jobTrackerPort,taskTrackerPort,taskTrackers, FileSystem.get(conf).getUri().toString(), taskTrackers,null,null,null,conf); + + jobConf = mrCluster.createJobConf(); + } catch (IOException e) { + throw new IllegalStateException("Failed to Setup MR Cluster",e); + } + } + + private void setupZookeeper() { + try { + zookeeperDir = new File(workDir,"zk").getAbsolutePath(); + zookeeperPort = findFreePort(); + zookeeperCluster = new MiniZooKeeperCluster(); + zookeeperCluster.setClientPort(zookeeperPort); + zookeeperCluster.startup(new File(zookeeperDir)); + } catch(Exception e) { + throw new IllegalStateException("Failed to Setup Zookeeper Cluster",e); + } + } + + private void setupHBaseCluster() { + final int numRegionServers = 1; + + try { + hbaseDir = new File(workDir,"hbase").getAbsolutePath(); + hbaseRoot = "file://" + hbaseDir; + + hbaseConf = HBaseConfiguration.create(); + //make sure it doesn't load hbase-site.xml file + hbaseConf.clear(); + hbaseConf.set("hbase.rootdir", hbaseRoot); + hbaseConf.set("hbase.master", "local"); + hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort); + hbaseConf.set("hbase.zookeeper.quorum", "127.0.0.1"); + hbaseConf.setInt("hbase.master.port", findFreePort()); + hbaseConf.setInt("hbase.master.info.port", -1); + hbaseConf.setInt("hbase.regionserver.port", findFreePort()); + hbaseConf.setInt("hbase.regionserver.info.port", -1); + + hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers); + hbaseConf.set("hbase.master", hbaseCluster.getHMasterAddress().toString()); + //opening the META table ensures that cluster is running + new HTable(hbaseConf, HConstants.META_TABLE_NAME); + } catch (Exception e) { + throw new IllegalStateException("Failed to setup HBase Cluster",e); + } + } + + private void setUpMetastore() throws Exception { + hiveConf = new HiveConf(this.getClass()); + + //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook + //is present only in the ql/test directory + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,"jdbc:derby:"+workDir+"/metastore_db;create=true"); + //set where derby logs + File derbyLogFile = new File(workDir+"/derby.log"); + derbyLogFile.createNewFile(); + System.setProperty("derby.stream.error.file",derbyLogFile.getPath()); + + +// Driver driver = new Driver(hiveConf); +// SessionState.start(new CliSessionState(hiveConf)); + + hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); + } + + private static int findFreePort() throws IOException { + ServerSocket server = new ServerSocket(0); + int port = server.getLocalPort(); + server.close(); + return port; + } + +} diff --git storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java new file mode 100644 index 0000000..e87906c --- /dev/null +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +/** + * Base class for HBase Tests which need a mini cluster instance + */ +public abstract class SkeletonHBaseTest { + + protected static String TEST_DIR = System.getProperty("test.data.dir", "./"); + + protected final static String DEFAULT_CONTEXT_HANDLE = "default"; + + protected static Map contextMap = new HashMap(); + protected static Set tableNames = new HashSet(); + + protected void createTable(String tableName, String[] families) { + try { + HBaseAdmin admin = new HBaseAdmin(getHbaseConf()); + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + for(String family: families) { + HColumnDescriptor columnDescriptor = new HColumnDescriptor(family); + tableDesc.addFamily(columnDescriptor); + } + admin.createTable(tableDesc); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException(e); + } + + } + + protected String newTableName(String prefix) { + String name =null; + int tries = 100; + do { + name = prefix+"_"+Math.abs(new Random().nextLong()); + } while(tableNames.contains(name) && --tries > 0); + if(tableNames.contains(name)) + throw new IllegalStateException("Couldn't find a unique table name, tableNames size: "+tableNames.size()); + tableNames.add(name); + return name; + } + + /** + * startup an hbase cluster instance before a test suite runs + */ + @BeforeClass + public static void setup() { + if(!contextMap.containsKey(getContextHandle())) + contextMap.put(getContextHandle(),new Context(getContextHandle())); + + contextMap.get(getContextHandle()).start(); + } + + /** + * shutdown an hbase cluster instance ant the end of the test suite + */ + @AfterClass + public static void tearDown() { + contextMap.get(getContextHandle()).stop(); + } + + /** + * override this with a different context handle if tests suites are run simultaneously + * and ManyMiniCluster instances shouldn't be shared + * @return + */ + public static String getContextHandle() { + return DEFAULT_CONTEXT_HANDLE; + } + + /** + * @return working directory for a given test context, which normally is a test suite + */ + public String getTestDir() { + return contextMap.get(getContextHandle()).getTestDir(); + } + + /** + * @return ManyMiniCluster instance + */ + public ManyMiniCluster getCluster() { + return contextMap.get(getContextHandle()).getCluster(); + } + + /** + * @return configuration of MiniHBaseCluster + */ + public Configuration getHbaseConf() { + return contextMap.get(getContextHandle()).getHbaseConf(); + } + + /** + * @return configuration of MiniMRCluster + */ + public Configuration getJobConf() { + return contextMap.get(getContextHandle()).getJobConf(); + } + + /** + * @return configuration of Hive Metastore + */ + public HiveConf getHiveConf() { + return contextMap.get(getContextHandle()).getHiveConf(); + } + + /** + * @return filesystem used by ManyMiniCluster daemons + */ + public FileSystem getFileSystem() { + return contextMap.get(getContextHandle()).getFileSystem(); + } + + /** + * class used to encapsulate a context which is normally used by + * a single TestSuite or across TestSuites when multi-threaded testing is turned on + */ + public static class Context { + protected String testDir; + protected ManyMiniCluster cluster; + + protected Configuration hbaseConf; + protected Configuration jobConf; + protected HiveConf hiveConf; + + protected FileSystem fileSystem; + + protected int usageCount = 0; + + public Context(String handle) { + try { + testDir = new File(TEST_DIR+"/test_"+handle+"_"+Math.abs(new Random().nextLong())+"/").getCanonicalPath(); + } catch (IOException e) { + throw new IllegalStateException("Failed to generate testDir",e); + } + System.out.println("Cluster work directory: "+testDir); + } + + public void start() { + if(usageCount++ == 0) { + cluster = new ManyMiniCluster(testDir); + cluster.start(); + hbaseConf = cluster.getHBaseConf(); + jobConf = cluster.getJobConf(); + fileSystem = cluster.getFileSystem(); + hiveConf = cluster.getHiveConf(); + } + } + + public void stop() { + if( --usageCount == 0) { + try { + cluster.stop(); + cluster = null; + } finally { + System.out.println("Trying to cleanup: "+testDir); + try { + FileUtil.fullyDelete(new File(testDir)); + } catch (IOException e) { + throw new IllegalStateException("Failed to cleanup test dir",e); + } + } + } + } + + public String getTestDir() { + return testDir; + } + + public ManyMiniCluster getCluster() { + return cluster; + } + + public Configuration getHbaseConf() { + return hbaseConf; + } + + public Configuration getJobConf() { + return jobConf; + } + + public HiveConf getHiveConf() { + return hiveConf; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + } + +} diff --git storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java new file mode 100644 index 0000000..d612584 --- /dev/null +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test HBaseDirectOuputStorageDriver and HBaseDirectOUtputFormat using a MiniCluster + */ +public class TestHBaseDirectOutputStorageDriver extends SkeletonHBaseTest { + + private void registerHBaseTable(String tableName) throws Exception { + + String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ; + HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf()); + + try { + client.dropTable(databaseName, tableName); + } catch(Exception e) { + } //can fail with NoSuchObjectException + + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType(TableType.EXTERNAL_TABLE.toString()); + StorageDescriptor sd = new StorageDescriptor(); + + sd.setCols(getTableColumns()); + tbl.setPartitionKeys(new ArrayList()); + + tbl.setSd(sd); + + sd.setBucketCols(new ArrayList(2)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put( + Constants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName()); + sd.setInputFormat("fillme"); + sd.setOutputFormat(HBaseDirectOutputFormat.class.getName()); + + Map tableParams = new HashMap(); + tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme"); + tableParams.put(HCatConstants.HCAT_OSD_CLASS, HBaseDirectOutputStorageDriver.class.getName()); + tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish"); + tbl.setParameters(tableParams); + + client.createTable(tbl); + } + + protected List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + private static List generateDataColumns() throws HCatException { + List dataColumns = new ArrayList(); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("key", Constants.INT_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""))); + return dataColumns; + } + + public void test() throws IOException { + Configuration conf = getHbaseConf(); + String tableName = "my_table"; + byte[] tableNameBytes = Bytes.toBytes(tableName); + String familyName = "my_family"; + byte[] familyNameBytes = Bytes.toBytes(familyName); + createTable(tableName,new String[]{familyName}); + HTable table = new HTable(getHbaseConf(),tableNameBytes); + byte[] key = Bytes.toBytes("foo"); + byte[] qualifier = Bytes.toBytes("qualifier"); + byte[] val = Bytes.toBytes("bar"); + Put put = new Put(key); + put.add(familyNameBytes, qualifier, val); + table.put(put); + Result result = table.get(new Get(key)); + assertTrue(Bytes.equals(val, result.getValue(familyNameBytes, qualifier))); + } + + @Test + public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException { + String tableName = newTableName("mrTest"); + byte[] tableNameBytes = Bytes.toBytes(tableName); + String familyName = "my_family"; + byte[] familyNameBytes = Bytes.toBytes(familyName); + + //include hbase config in conf file + Configuration conf = new Configuration(getJobConf()); + for(Map.Entry el: getHbaseConf()) { + if(el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(),el.getValue()); + } + } + + //create table + createTable(tableName,new String[]{familyName}); + + String data[] = {"1,english:ONE,spanish:UNO", + "2,english:ONE,spanish:DOS", + "3,english:ONE,spanish:TRES"}; + + + + // input/output settings + Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/"); + getFileSystem().mkdirs(inputPath); + FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); + for(String line: data) + os.write(Bytes.toBytes(line + "\n")); + os.close(); + + //create job + Job job = new Job(conf, "hcat mapreduce write test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapWrite.class); + + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, inputPath); + + job.setOutputFormatClass(HBaseDirectOutputFormat.class); + job.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Put.class); + + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(Put.class); + + job.setNumReduceTasks(0); + assertTrue(job.waitForCompletion(true)); + + //verify + HTable table = new HTable(conf, tableName); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("my_family")); + ResultScanner scanner = table.getScanner(scan); + int index=0; + for(Result result: scanner) { + String vals[] = data[index].toString().split(","); + for(int i=1;i { + + @Override + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String vals[] = value.toString().split(","); + Put put = new Put(Bytes.toBytes(vals[0])); + for(int i=1;i el: getHbaseConf()) { + if(el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(),el.getValue()); + } + } + + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); + + //create table + createTable(tableName,new String[]{familyName}); + registerHBaseTable(tableName); + + + String data[] = {"1,english:ONE,spanish:UNO", + "2,english:ONE,spanish:DOS", + "3,english:ONE,spanish:TRES"}; + + + + // input/output settings + Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/"); + getFileSystem().mkdirs(inputPath); + FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); + for(String line: data) + os.write(Bytes.toBytes(line + "\n")); + os.close(); + + //create job + Job job = new Job(conf, "hcat mapreduce write test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapHCatWrite.class); + + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, inputPath); + + + job.setOutputFormatClass(HCatOutputFormat.class); + OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null); + outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1"); + HCatOutputFormat.setOutput(job,outputJobInfo); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(HCatRecord.class); + + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(Put.class); + + job.setNumReduceTasks(0); + assertTrue(job.waitForCompletion(true)); + + //verify + HTable table = new HTable(conf, tableName); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("my_family")); + ResultScanner scanner = table.getScanner(scan); + int index=0; + for(Result result: scanner) { + String vals[] = data[index].toString().split(","); + for(int i=1;i { + + @Override + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + HCatRecord record = new DefaultHCatRecord(3); + HCatSchema schema = new HCatSchema(generateDataColumns()); + String vals[] = value.toString().split(","); + record.setInteger("key",schema,Integer.parseInt(vals[0])); + for(int i=1;i