diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java new file mode 100644 index 0000000..989286f --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; +import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.OutputJobInfo; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +/** + * Base class share by both {@link HBaseBulkOutputStorageDriver} and {@link HBaseDirectOutputStorageDriver} + */ +abstract class HBaseBaseOutputStorageDriver extends HCatOutputStorageDriver { + protected HCatTableInfo tableInfo; + protected ResultConverter converter; + protected OutputJobInfo outputJobInfo; + protected HCatSchema schema; + protected HCatSchema outputSchema; + + @Override + public void initialize(JobContext context, Properties hcatProperties) throws IOException { + hcatProperties = (Properties)hcatProperties.clone(); + super.initialize(context, hcatProperties); + + + String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + if( jobString == null ) { + throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput() not called?"); + } + + outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString); + //override table properties with user defined ones + //TODO in the future we should be more selective on what to override + hcatProperties.putAll(outputJobInfo.getProperties()); + outputJobInfo.getProperties().putAll(hcatProperties); + hcatProperties = outputJobInfo.getProperties(); + + + String revision = outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY); + if(revision == null) { + outputJobInfo.getProperties() + .setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, + new Path(outputJobInfo.getLocation()).getName()); + } + + tableInfo = outputJobInfo.getTableInfo(); + schema = tableInfo.getDataColumns(); + + List fields = HCatUtil.getFieldSchemaList(outputSchema.getFields()); + hcatProperties.setProperty(Constants.LIST_COLUMNS, + MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); + hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES, + MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); + + //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize being called + converter = new HBaseSerDeResultConverter(schema, + outputSchema, + hcatProperties); + context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName()); + } + + @Override + public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException { + this.outputSchema = schema; + } + + @Override + public WritableComparable generateKey(HCatRecord value) throws IOException { + //HBase doesn't use KEY as part of output + return null; + } + + @Override + public Writable convertValue(HCatRecord value) throws IOException { + return converter.convert(value); + } + + @Override + public void setPartitionValues(JobContext jobContext, Map partitionValues) throws IOException { + //no partitions for this driver + } + + @Override + public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException { + return null; + } + + @Override + public void setOutputPath(JobContext jobContext, String location) throws IOException { + //no output path + } + + @Override + public String getOutputLocation(JobContext jobContext, String tableLocation, List partitionCols, Map partitionValues, String dynHash) throws IOException { + //TODO figure out a way to include user specified revision number as part of dir + return new Path(tableLocation, Long.toString(System.currentTimeMillis())).toString(); + } +} diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java new file mode 100644 index 0000000..5165f7f --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java @@ -0,0 +1,108 @@ +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +import java.io.IOException; + +/** + * Class which imports data into HBase via it's "bulk load" feature. Wherein regions + * are created by the MR job using HFileOutputFormat and then later "moved" into + * the appropriate region server. + */ +class HBaseBulkOutputFormat extends SequenceFileOutputFormat { + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { + return new HBaseBulkOutputCommitter(FileOutputFormat.getOutputPath(context),context,(FileOutputCommitter)super.getOutputCommitter(context)); + } + + private static class HBaseBulkOutputCommitter extends FileOutputCommitter { + FileOutputCommitter baseOutputCommitter; + + public HBaseBulkOutputCommitter(Path outputPath, TaskAttemptContext taskAttemptContext, + FileOutputCommitter baseOutputCommitter) throws IOException { + super(outputPath, taskAttemptContext); + this.baseOutputCommitter = baseOutputCommitter; + } + + @Override + public void abortTask(TaskAttemptContext context) { + baseOutputCommitter.abortTask(context); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + baseOutputCommitter.commitTask(context); + } + + @Override + public Path getWorkPath() throws IOException { + return baseOutputCommitter.getWorkPath(); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + return baseOutputCommitter.needsTaskCommit(context); + } + + @Override + public void setupJob(JobContext context) throws IOException { + baseOutputCommitter.setupJob(context); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + baseOutputCommitter.setupTask(context); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + try { + baseOutputCommitter.abortJob(jobContext,state); + } finally { + cleanIntermediate(jobContext); + } + } + + @Override + public void cleanupJob(JobContext context) throws IOException { + try { + baseOutputCommitter.cleanupJob(context); + } finally { + cleanIntermediate(context); + } + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + try { + baseOutputCommitter.commitJob(jobContext); + Configuration conf = jobContext.getConfiguration(); + Path srcPath = FileOutputFormat.getOutputPath(jobContext); + Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles"); + ImportSequenceFile.runJob(conf, + conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY), + srcPath, + destPath); + } finally { + cleanIntermediate(jobContext); + } + } + + public void cleanIntermediate(JobContext jobContext) throws IOException { + FileSystem fs = FileSystem.get(jobContext.getConfiguration()); + fs.delete(FileOutputFormat.getOutputPath(jobContext),true); + } + } +} diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java new file mode 100644 index 0000000..9dfd238 --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hcatalog.data.HCatRecord; + +import java.io.IOException; +import java.util.Properties; + + +/** + * Storage driver which works with {@link HBaseBulkOutputFormat} and makes use + * of HBase's "bulk load" feature to get data into HBase. This should be + * efficient for large batch writes in comparison to HBaseDirectOutputStorageDriver. + */ +public class HBaseBulkOutputStorageDriver extends HBaseBaseOutputStorageDriver { + private OutputFormat outputFormat; + private final static ImmutableBytesWritable EMPTY_KEY = new ImmutableBytesWritable(new byte[0]); + + @Override + public void initialize(JobContext context, Properties hcatProperties) throws IOException { + super.initialize(context, hcatProperties); + Path outputDir = new Path(outputJobInfo.getLocation()); + context.getConfiguration().set("mapred.output.dir", outputDir.toString()); + outputFormat = new HBaseBulkOutputFormat(); + } + + @Override + public OutputFormat, ? super Writable> getOutputFormat() throws IOException { + return outputFormat; + } + + @Override + public WritableComparable generateKey(HCatRecord value) throws IOException { + return EMPTY_KEY; + } + +} diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java index 7d88bc7..21bc5b5 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.hbase.HBaseSerDe; import org.apache.hcatalog.common.HCatConstants; /** - * Constants class for constants used in the HBase Storage Drivers module + * Constants class for constants used in Ht */ class HBaseConstants { @@ -35,4 +35,7 @@ class HBaseConstants { /** key used to define the column mapping of hbase to hcatalog schema */ public static final String PROPERTY_COLUMN_MAPPING_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING; + /** key used to define wether bulk storage driver will be used or not */ + public static final String PROPERTY_OSD_BULK_MODE_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.output.bulkMode"; + } diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java index 241866d..845f0db 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java @@ -18,64 +18,24 @@ package org.apache.hcatalog.hbase; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.HCatRecord; -import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; -import org.apache.hcatalog.mapreduce.HCatTableInfo; -import org.apache.hcatalog.mapreduce.OutputJobInfo; import java.io.IOException; -import java.util.List; -import java.util.Map; import java.util.Properties; /** * HBase Storage driver implementation which uses "direct" writes to hbase for writing out records. */ -public class HBaseDirectOutputStorageDriver extends HCatOutputStorageDriver { - private HCatTableInfo tableInfo; +public class HBaseDirectOutputStorageDriver extends HBaseBaseOutputStorageDriver { + private HBaseDirectOutputFormat outputFormat; - private ResultConverter converter; - private OutputJobInfo outputJobInfo; - private HCatSchema schema; - private HCatSchema outputSchema; @Override public void initialize(JobContext context, Properties hcatProperties) throws IOException { super.initialize(context, hcatProperties); - String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); - if( jobString == null ) { - throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput() not called?"); - } - outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString); - tableInfo = outputJobInfo.getTableInfo(); - schema = tableInfo.getDataColumns(); - - List fields = HCatUtil.getFieldSchemaList(outputSchema.getFields()); - hcatProperties.setProperty(Constants.LIST_COLUMNS, - MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); - hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES, - MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); - - //override table properties with user defined ones - //in the future we should be more selective on what to override - hcatProperties.putAll(outputJobInfo.getProperties()); - //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize being called - converter = new HBaseSerDeResultConverter(schema, - outputSchema, - hcatProperties); - context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName()); outputFormat = new HBaseDirectOutputFormat(); outputFormat.setConf(context.getConfiguration()); } @@ -85,39 +45,4 @@ public class HBaseDirectOutputStorageDriver extends HCatOutputStorageDriver { return outputFormat; } - @Override - public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException { - this.outputSchema = schema; - } - - @Override - public WritableComparable generateKey(HCatRecord value) throws IOException { - //HBase doesn't use KEY as part of output - return null; - } - - @Override - public Writable convertValue(HCatRecord value) throws IOException { - return converter.convert(value); - } - - @Override - public void setPartitionValues(JobContext jobContext, Map partitionValues) throws IOException { - //no partitions for this driver - } - - @Override - public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException { - return null; - } - - @Override - public void setOutputPath(JobContext jobContext, String location) throws IOException { - //no output path - } - - @Override - public String getOutputLocation(JobContext jobContext, String tableLocation, List partitionCols, Map partitionValues, String dynHash) throws IOException { - return null; - } } diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java new file mode 100644 index 0000000..19ddd28 --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java @@ -0,0 +1,102 @@ +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Forwarding HBaseOutputStorageDriver, actual implementation is decided by a configuration + * {@link HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY} which defaults to HBaseBulkOutputStorageDriver + */ +public class HBaseOutputStorageDriver extends HCatOutputStorageDriver { + + private HBaseBulkOutputStorageDriver bulkOSD = new HBaseBulkOutputStorageDriver(); + private HBaseDirectOutputStorageDriver directOSD = new HBaseDirectOutputStorageDriver(); + private HBaseBaseOutputStorageDriver activeOSD; + + @Override + public void initialize(JobContext context, Properties hcatProperties) throws IOException { + super.initialize(context, hcatProperties); + determineOSD(context.getConfiguration(),hcatProperties); + activeOSD.initialize(context,hcatProperties); + } + + @Override + public WritableComparable generateKey(HCatRecord value) throws IOException { + return activeOSD.generateKey(value); + } + + @Override + public Writable convertValue(HCatRecord value) throws IOException { + return activeOSD.convertValue(value); + } + + @Override + public String getOutputLocation(JobContext jobContext, String tableLocation, List partitionCols, Map partitionValues, String dynHash) throws IOException { + //sanity check since we can't determine which will be used till initialize + //and this method gets called before that + String location = bulkOSD.getOutputLocation(jobContext, tableLocation, partitionCols, partitionValues, dynHash); + if(!location.equals(directOSD.getOutputLocation(jobContext, tableLocation, partitionCols, partitionValues, dynHash))) { + throw new IOException("bulkOSD and directOSD return inconsistent path for getOutputLocation()"); + } + return location; + } + + @Override + public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException { + return activeOSD.getWorkFilePath(context,outputLoc); + } + + @Override + public OutputFormat, ? super Writable> getOutputFormat() throws IOException { + return activeOSD.getOutputFormat(); + } + + @Override + public void setOutputPath(JobContext jobContext, String location) throws IOException { + directOSD.setOutputPath(jobContext, location); + bulkOSD.setOutputPath(jobContext, location); + } + + @Override + public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException { + directOSD.setSchema(jobContext,schema); + bulkOSD.setSchema(jobContext,schema); + } + + @Override + public void setPartitionValues(JobContext jobContext, Map partitionValues) throws IOException { + directOSD.setPartitionValues(jobContext,partitionValues); + bulkOSD.setPartitionValues(jobContext,partitionValues); + } + + private void determineOSD(Configuration conf, Properties prop) { + if(activeOSD != null) + return; + + String bulkMode = conf.get(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY); + if(bulkMode == null && prop != null) + bulkMode = prop.getProperty(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY); + + if(bulkMode != null && !Boolean.valueOf(bulkMode)) { + activeOSD = directOSD; + bulkOSD = null; + } + else { + activeOSD = bulkOSD; + directOSD = null; + } + } +} diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java index 23450c2..515554e 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java @@ -71,7 +71,7 @@ class HBaseSerDeResultConverter implements ResultConverter { * @param hcatProperties table properties * @throws IOException thrown if hive's HBaseSerDe couldn't be initialized */ - public HBaseSerDeResultConverter(HCatSchema schema, + HBaseSerDeResultConverter(HCatSchema schema, HCatSchema outputSchema, Properties hcatProperties) throws IOException { diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java new file mode 100644 index 0000000..0d5191e --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java @@ -0,0 +1,232 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.PutSortReducer; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import static org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.*; + + +/** + * MapReduce job which reads a series of Puts stored in a sequence file + * and imports the data into HBase. It needs to create the necessary HBase + * regions using HFileOutputFormat and then notify the correct region servers + * to doBulkLoad(). This will be used After an MR job has written the SequenceFile + * and data needs to be bulk loaded onto HBase. + */ +class ImportSequenceFile { + private final static Log LOG = LogFactory.getLog(ImportSequenceFile.class); + private final static String NAME = "HCatImportSequenceFile"; + private final static String IMPORTER_WORK_DIR = "_IMPORTER_MR_WORK_DIR"; + + + private static class SequenceFileImporter extends Mapper { + + @Override + public void map(ImmutableBytesWritable rowKey, Put value, + Context context) + throws IOException { + try { + context.write(new ImmutableBytesWritable(value.getRow()), value); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private static class ImporterOutputFormat extends HFileOutputFormat { + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { + final OutputCommitter baseOutputCommitter = super.getOutputCommitter(context); + + return new OutputCommitter() { + @Override + public void setupJob(JobContext jobContext) throws IOException { + baseOutputCommitter.setupJob(jobContext); + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + baseOutputCommitter.setupTask(taskContext); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { + return baseOutputCommitter.needsTaskCommit(taskContext); + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + baseOutputCommitter.commitTask(taskContext); + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + baseOutputCommitter.abortTask(taskContext); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + try { + baseOutputCommitter.abortJob(jobContext,state); + } finally { + cleanupScratch(jobContext); + } + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + try { + baseOutputCommitter.commitJob(jobContext); + Configuration conf = jobContext.getConfiguration(); + //import hfiles + new LoadIncrementalHFiles(conf) + .doBulkLoad(HFileOutputFormat.getOutputPath(jobContext), + new HTable(conf, + conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY))); + } finally { + cleanupScratch(jobContext); + } + } + + @Override + public void cleanupJob(JobContext context) throws IOException { + try { + baseOutputCommitter.cleanupJob(context); + } finally { + cleanupScratch(context); + } + } + + private void cleanupScratch(JobContext context) throws IOException{ + FileSystem fs = FileSystem.get(context.getConfiguration()); + fs.delete(HFileOutputFormat.getOutputPath(context),true); + } + }; + } + } + + private static Job createSubmittableJob(Configuration conf, String tableName, Path inputDir, Path scratchDir, boolean localMode) + throws IOException { + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(SequenceFileImporter.class); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(SequenceFileImporter.class); + + HTable table = new HTable(conf, tableName); + job.setReducerClass(PutSortReducer.class); + FileOutputFormat.setOutputPath(job, scratchDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + HFileOutputFormat.configureIncrementalLoad(job, table); + //override OutputFormatClass with our own so we can include cleanup in the committer + job.setOutputFormatClass(ImporterOutputFormat.class); + + //local mode doesn't support symbolic links so we have to manually set the actual path + if(localMode) { + String partitionFile = null; + for(URI uri: DistributedCache.getCacheFiles(job.getConfiguration())) { + if(DEFAULT_PATH.equals(uri.getFragment())) { + partitionFile = uri.toString(); + break; + } + } + partitionFile = partitionFile.substring(0,partitionFile.lastIndexOf("#")); + job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH,partitionFile.toString()); + } + + //add hbase dependency jars + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(job.getConfiguration()); + return job; + } + + /** + * Method to run the Importer MapReduce Job. Normally will be called by another MR job + * during OutputCommitter.commitJob(). + * @param otherConf configuration of the parent job + * @param tableName name of table to bulk load data into + * @param InputDir path of SequenceFile formatted data to read + * @param scratchDir temporary path for the Importer MR job to build the HFiles which will be imported + * @return + */ + static boolean runJob(Configuration otherConf, String tableName, Path InputDir, Path scratchDir) { + Configuration conf = HBaseConfiguration.create(); + for(Map.Entry el: otherConf) { + if(el.getKey().startsWith("hbase.")) + conf.set(el.getKey(),el.getValue()); + } + conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName); + + boolean localMode = "local".equals(conf.get("mapred.job.tracker")); + boolean success = false; + try { + FileSystem fs = FileSystem.get(conf); + Path workDir = new Path(new Job(otherConf).getWorkingDirectory(),IMPORTER_WORK_DIR); + if(!fs.mkdirs(workDir)) + throw new IOException("Importer work directory already exists: "+workDir); + Job job = createSubmittableJob(conf, tableName, InputDir, scratchDir, localMode); + job.setWorkingDirectory(workDir); + success = job.waitForCompletion(true); + fs.delete(workDir, true); + //We only cleanup on success because failure might've been caused by existence of target directory + if(localMode && success) + new ImporterOutputFormat().getOutputCommitter(new TaskAttemptContext(conf,new TaskAttemptID())).commitJob(job); + } catch (InterruptedException e) { + LOG.error("ImportSequenceFile Failed", e); + } catch (ClassNotFoundException e) { + LOG.error("ImportSequenceFile Failed",e); + } catch (IOException e) { + LOG.error("ImportSequenceFile Failed",e); + } + return success; + } + +} diff --git storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java new file mode 100644 index 0000000..c25e56d --- /dev/null +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java @@ -0,0 +1,483 @@ +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests components of HBaseBulkOutputStorageDriver using ManyMiniCluster. + * Including ImprtSequenceFile, HBaseOutputStorageDrivers and HBaseBulkOutputFormat + */ +public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest { + private final String suiteName = "TestHBaseBulkOutputStorageDriver"; + + private void registerHBaseTable(String tableName) throws Exception { + + String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ; + HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf()); + + try { + client.dropTable(databaseName, tableName); + } catch(Exception e) { + } //can fail with NoSuchObjectException + + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType(TableType.EXTERNAL_TABLE.toString()); + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation(getTestDir()+"/"+suiteName+"/"+tableName); + sd.setCols(getTableColumns()); + tbl.setPartitionKeys(new ArrayList()); + + tbl.setSd(sd); + + sd.setBucketCols(new ArrayList(2)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put( + Constants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName()); + sd.setInputFormat("fillme"); + sd.setOutputFormat(HBaseBulkOutputFormat.class.getName()); + + Map tableParams = new HashMap(); + tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme"); + tableParams.put(HCatConstants.HCAT_OSD_CLASS, HBaseOutputStorageDriver.class.getName()); + tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish"); + tbl.setParameters(tableParams); + + client.createTable(tbl); + } + + protected List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + private static List generateDataColumns() throws HCatException { + List dataColumns = new ArrayList(); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("key", Constants.INT_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""))); + return dataColumns; + } + + public static class MapWrite extends Mapper { + + @Override + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String vals[] = value.toString().split(","); + Put put = new Put(Bytes.toBytes(vals[0])); + for(int i=1;i { + @Override + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + HCatRecord record = new DefaultHCatRecord(3); + HCatSchema schema = new HCatSchema(generateDataColumns()); + String vals[] = value.toString().split(","); + record.setInteger("key",schema,Integer.parseInt(vals[0])); + for(int i=1;i el: getHbaseConf()) { + if(el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(),el.getValue()); + } + } + + //create table + conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName); + createTable(tableName, new String[]{familyName}); + + String data[] = {"1,english:one,spanish:uno", + "2,english:two,spanish:dos", + "3,english:three,spanish:tres"}; + + + + // input/output settings + Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/"); + getFileSystem().mkdirs(inputPath); + FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); + for(String line: data) + os.write(Bytes.toBytes(line + "\n")); + os.close(); + Path interPath = new Path(getTestDir()+"/hbaseBulkOutputFormatTest/inter"); + + //create job + Job job = new Job(conf, "bulk write"); + job.setWorkingDirectory(new Path(getTestDir(),"hbaseBulkOutputFormatTest_MR")); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapWrite.class); + + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, inputPath); + + job.setOutputFormatClass(HBaseBulkOutputFormat.class); + SequenceFileOutputFormat.setOutputPath(job,interPath); + + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); + + job.setNumReduceTasks(0); + + assertTrue(job.waitForCompletion(true)); + + //verify + HTable table = new HTable(conf, tableName); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("my_family")); + ResultScanner scanner = table.getScanner(scan); + int index=0; + for(Result result: scanner) { + String vals[] = data[index].toString().split(","); + for(int i=1;i el: getHbaseConf()) { + if(el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(),el.getValue()); + } + } + + //create table + createTable(tableName,new String[]{familyName}); + + String data[] = {"1,english:one,spanish:uno", + "2,english:two,spanish:dos", + "3,english:three,spanish:tres"}; + + + + // input/output settings + Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/"); + getFileSystem().mkdirs(inputPath); + FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); + for(String line: data) + os.write(Bytes.toBytes(line + "\n")); + os.close(); + Path interPath = new Path(getTestDir()+"/ImportSequenceFileTest/inter"); + Path scratchPath = new Path(getTestDir()+"/ImportSequenceFileTest/scratch"); + + + //create job + Job job = new Job(conf, "sequence file write"); + job.setWorkingDirectory(new Path(getTestDir(),"importSequenceFileTest_MR")); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapWrite.class); + + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, inputPath); + + job.setOutputFormatClass(SequenceFileOutputFormat.class); + SequenceFileOutputFormat.setOutputPath(job,interPath); + + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); + + job.setNumReduceTasks(0); + + assertTrue(job.waitForCompletion(true)); + + assertTrue(ImportSequenceFile.runJob(job.getConfiguration(),tableName,interPath,scratchPath)); + + //verify + HTable table = new HTable(conf, tableName); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("my_family")); + ResultScanner scanner = table.getScanner(scan); + int index=0; + for(Result result: scanner) { + String vals[] = data[index].toString().split(","); + for(int i=1;i el: getHbaseConf()) { + if(el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(),el.getValue()); + } + } + + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); + + //create table + createTable(tableName,new String[]{familyName}); + registerHBaseTable(tableName); + + + String data[] = {"1,english:ONE,spanish:UNO", + "2,english:ONE,spanish:DOS", + "3,english:ONE,spanish:TRES"}; + + + + // input/output settings + Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/"); + getFileSystem().mkdirs(inputPath); + FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); + for(String line: data) + os.write(Bytes.toBytes(line + "\n")); + os.close(); + + //create job + Job job = new Job(conf, "hcat mapreduce write test"); + job.setWorkingDirectory(new Path(getTestDir(),"hbaseOutputStorageDriverTest_MR")); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapHCatWrite.class); + + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, inputPath); + + + job.setOutputFormatClass(HCatOutputFormat.class); + OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null); + outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1"); + HCatOutputFormat.setOutput(job,outputJobInfo); + + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(HCatRecord.class); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); + + job.setNumReduceTasks(0); + + assertTrue(job.waitForCompletion(true)); + + //verify + HTable table = new HTable(conf, tableName); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("my_family")); + ResultScanner scanner = table.getScanner(scan); + int index=0; + for(Result result: scanner) { + String vals[] = data[index].toString().split(","); + for(int i=1;i el: getHbaseConf()) { + if(el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(),el.getValue()); + } + } + + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); + + //create table + createTable(tableName,new String[]{familyName}); + registerHBaseTable(tableName); + + + String data[] = {"1,english:ONE,spanish:UNO", + "2,english:ONE,spanish:DOS", + "3,english:ONE,spanish:TRES"}; + + + + // input/output settings + Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/"); + getFileSystem().mkdirs(inputPath); + FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); + for(String line: data) + os.write(Bytes.toBytes(line + "\n")); + os.close(); + + long ubTimestamp = System.currentTimeMillis(); + long lbTimestamp = ubTimestamp; + //create job + Job job = new Job(conf, "hcat mapreduce write test"); + job.setWorkingDirectory(new Path(getTestDir(),"hbaseOutputStorageDriverTest_MR")); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapHCatWrite.class); + + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, inputPath); + + + job.setOutputFormatClass(HCatOutputFormat.class); + OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null); + HCatOutputFormat.setOutput(job,outputJobInfo); + ubTimestamp = System.currentTimeMillis(); + System.out.println("ub: "+ubTimestamp); + + + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(HCatRecord.class); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); + + job.setNumReduceTasks(0); + + assertTrue(job.waitForCompletion(true)); + + //verify + HTable table = new HTable(conf, tableName); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("my_family")); + ResultScanner scanner = table.getScanner(scan); + int index=0; + Long prevTimestamp = null; + for(Result result: scanner) { + String vals[] = data[index].toString().split(","); + for(int i=1;i