diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java index 8ba0e8a..5a9eda2 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java @@ -30,11 +30,14 @@ import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; +import org.apache.hcatalog.hbase.snapshot.Transaction; import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; import org.apache.hcatalog.mapreduce.HCatTableInfo; import org.apache.hcatalog.mapreduce.OutputJobInfo; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; @@ -62,7 +65,6 @@ abstract class HBaseBaseOutputStorageDriver extends HCatOutputStorageDriver { hcatProperties = (Properties)hcatProperties.clone(); super.initialize(context, hcatProperties); - String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); if( jobString == null ) { throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput() not called?"); @@ -75,16 +77,9 @@ abstract class HBaseBaseOutputStorageDriver extends HCatOutputStorageDriver { outputJobInfo.getProperties().putAll(hcatProperties); hcatProperties = outputJobInfo.getProperties(); - - String revision = outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY); - if(revision == null) { - outputJobInfo.getProperties() - .setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, - Long.toString(System.currentTimeMillis())); - } - tableInfo = outputJobInfo.getTableInfo(); schema = tableInfo.getDataColumns(); + String fullyQualifiedName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo); List fields = HCatUtil.getFieldSchemaList(outputSchema.getFields()); hcatProperties.setProperty(Constants.LIST_COLUMNS, @@ -93,10 +88,31 @@ abstract class HBaseBaseOutputStorageDriver extends HCatOutputStorageDriver { MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize being called + //TODO reconcile output_revision passing to HBaseSerDeResultConverter + //on the first call to this method hcatProperties will not contain an OUTPUT_VERSION but that doesn't + //matter since we won't use any facilities that require that property set during that run converter = new HBaseSerDeResultConverter(schema, outputSchema, hcatProperties); - context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo)); + context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, fullyQualifiedName); + + String txnString = outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY); + if(txnString == null) { + RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(context.getConfiguration()); + Transaction txn = null; + try { + txn = rm.beginWriteTransaction(fullyQualifiedName, + Arrays.asList(converter.getHBaseScanColumns().split(" "))); + } finally { + rm.close(); + } + outputJobInfo.getProperties() + .setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, + HCatUtil.serialize(txn)); + outputJobInfo.getProperties() + .setProperty(HBaseSerDeResultConverter.PROPERTY_OUTPUT_VERSION_KEY, + Long.toString(txn.getRevisionNumber())); + } } @Override diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java index 004e3df..25c2d21 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java @@ -1,5 +1,7 @@ package org.apache.hcatalog.hbase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -17,6 +19,12 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; +import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory; +import org.apache.hcatalog.hbase.snapshot.Transaction; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.OutputJobInfo; import java.io.IOException; @@ -28,6 +36,7 @@ import java.io.IOException; class HBaseBulkOutputFormat extends OutputFormat,Put> { private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(new byte[0]); private SequenceFileOutputFormat,Put> baseOutputFormat; + private final static Log LOG = LogFactory.getLog(HBaseBulkOutputFormat.class); public HBaseBulkOutputFormat() { baseOutputFormat = new SequenceFileOutputFormat,Put>(); @@ -110,35 +119,36 @@ class HBaseBulkOutputFormat extends OutputFormat,Put> { @Override public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + RevisionManager rm = null; try { baseOutputCommitter.abortJob(jobContext,state); + rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration()); + rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration())); } finally { cleanIntermediate(jobContext); - } - } - - @Override - public void cleanupJob(JobContext context) throws IOException { - try { - baseOutputCommitter.cleanupJob(context); - } finally { - cleanIntermediate(context); + if(rm != null) + rm.close(); } } @Override public void commitJob(JobContext jobContext) throws IOException { + RevisionManager rm = null; try { baseOutputCommitter.commitJob(jobContext); Configuration conf = jobContext.getConfiguration(); Path srcPath = FileOutputFormat.getOutputPath(jobContext); Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles"); ImportSequenceFile.runJob(jobContext, - conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY), - srcPath, - destPath); - } finally { + conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY), + srcPath, + destPath); + rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration()); + rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration())); cleanIntermediate(jobContext); + } finally { + if(rm != null) + rm.close(); } } diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java index bdc06ce..11e2b26 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.hbase.snapshot.Transaction; import java.io.IOException; import java.util.List; @@ -55,10 +56,10 @@ public class HBaseBulkOutputStorageDriver extends HBaseBaseOutputStorageDriver { //initialize() gets called multiple time in the lifecycle of an MR job, client, mapper, reducer, etc //depending on the case we have to make sure for some context variables we set here that they don't get set again if(!outputJobInfo.getProperties().containsKey(PROPERTY_INT_OUTPUT_LOCATION)) { + Transaction txn = (Transaction) + HCatUtil.deserialize(outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY)); String tableLocation = context.getConfiguration().get(PROPERTY_TABLE_LOCATION); - String location = new Path(tableLocation, - "REVISION_"+outputJobInfo.getProperties() - .getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)).toString(); + String location = new Path(tableLocation, "REVISION_"+txn.getRevisionNumber()).toString(); outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location); //We are writing out an intermediate sequenceFile hence location is not passed in OutputJobInfo.getLocation() //TODO replace this with a mapreduce constant when available diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java index 199f08c..cb5b388 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java @@ -26,8 +26,8 @@ import org.apache.hcatalog.common.HCatConstants; */ class HBaseConstants { - /** key used to define th version number HBaseOutputStorage driver to use when writing out data for a job */ - public static final String PROPERTY_OUTPUT_VERSION_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputVersion"; + /** key used to store write transaction object */ + public static final String PROPERTY_WRITE_TXN_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.writeTxn"; /** key used to define the name of the table to write to */ public static final String PROPERTY_OUTPUT_TABLE_NAME_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName"; diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java index 706c911..26f511a 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java @@ -24,10 +24,12 @@ import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; import java.io.IOException; @@ -57,7 +59,7 @@ class HBaseDirectOutputFormat extends OutputFormat,Writabl @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return outputFormat.getOutputCommitter(context); + return new HBaseDirectOutputCommitter(outputFormat.getOutputCommitter(context)); } @Override @@ -72,4 +74,63 @@ class HBaseDirectOutputFormat extends OutputFormat,Writabl public Configuration getConf() { return outputFormat.getConf(); } + + private static class HBaseDirectOutputCommitter extends OutputCommitter { + private OutputCommitter baseOutputCommitter; + + public HBaseDirectOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException { + this.baseOutputCommitter = baseOutputCommitter; + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + baseOutputCommitter.abortTask(context); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + baseOutputCommitter.commitTask(context); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + return baseOutputCommitter.needsTaskCommit(context); + } + + @Override + public void setupJob(JobContext context) throws IOException { + baseOutputCommitter.setupJob(context); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + baseOutputCommitter.setupTask(context); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + RevisionManager rm = null; + try { + baseOutputCommitter.abortJob(jobContext, state); + rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration()); + rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration())); + } finally { + if(rm != null) + rm.close(); + } + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + RevisionManager rm = null; + try { + baseOutputCommitter.commitJob(jobContext); + rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration()); + rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration())); + } finally { + if(rm != null) + rm.close(); + } + } + } } diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java index e8dc3ad..0b37b9f 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java @@ -48,12 +48,14 @@ import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvide import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory; import org.apache.hcatalog.hbase.snapshot.TableSnapshot; +import org.apache.hcatalog.hbase.snapshot.Transaction; import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager; import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; import org.apache.hcatalog.mapreduce.HCatOutputFormat; @@ -539,5 +541,32 @@ public class HBaseHCatStorageHandler extends HCatStorageHandler { HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp); } + static Transaction getWriteTransaction(Configuration conf) throws IOException { + OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties() + .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY)); + } + + static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException { + OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn)); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); + } + + static RevisionManager getOpenedRevisionManager(Configuration conf) throws IOException { + RevisionManager rm = getRevisionManager(conf); + rm.open(); + return rm; + } + + /** + * Get the Revision number that will be assigned to this job's output data + * @param conf configuration of the job + * @return the revision number used + * @throws IOException + */ + public static long getOutputRevision(Configuration conf) throws IOException { + return getWriteTransaction(conf).getRevisionNumber(); + } } diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java index 5eac5b8..1aaa88f 100644 --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; @@ -58,6 +59,9 @@ import java.util.Properties; * {@link HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY} */ class HBaseSerDeResultConverter implements ResultConverter { + /** key used to define th version number HBaseOutputStorage driver to use when writing out data for a job */ + static final String PROPERTY_OUTPUT_VERSION_KEY = "hcat.hbase.mapreduce.outputVersion"; + private HBaseSerDe serDe; private HCatSchema schema; private HCatSchema outputSchema; @@ -79,8 +83,8 @@ class HBaseSerDeResultConverter implements ResultConverter { hbaseColumnMapping = hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY); hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,hbaseColumnMapping); - if(hcatProperties.containsKey(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)) - outputVersion = Long.parseLong(hcatProperties.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)); + if(hcatProperties.containsKey(PROPERTY_OUTPUT_VERSION_KEY)) + outputVersion = Long.parseLong(hcatProperties.getProperty(PROPERTY_OUTPUT_VERSION_KEY)); else outputVersion = null; diff --git storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java index 57b4928..d5c0f23 100644 --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java @@ -30,12 +30,16 @@ import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; +import org.apache.hcatalog.hbase.snapshot.TableSnapshot; +import org.apache.hcatalog.hbase.snapshot.Transaction; import org.apache.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.junit.Test; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -145,6 +149,19 @@ public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest { job.setOutputFormatClass(HBaseBulkOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job,interPath); + //manually create transaction + RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf); + try { + OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null, null, null); + Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName)); + outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, + HCatUtil.serialize(txn)); + job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, + HCatUtil.serialize(outputJobInfo)); + } finally { + rm.close(); + } + job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(HCatRecord.class); @@ -323,113 +340,16 @@ public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest { job.setNumReduceTasks(0); - long ubTimestamp = System.currentTimeMillis(); - long lbTimestamp = ubTimestamp; - assertTrue(job.waitForCompletion(true)); - - ubTimestamp = System.currentTimeMillis(); - - //verify - HTable table = new HTable(conf, databaseName+"."+tableName); - Scan scan = new Scan(); - scan.addFamily(familyNameBytes); - ResultScanner scanner = table.getScanner(scan); - long prevTimestamp = -1; - int index=0; - for(Result result: scanner) { - String vals[] = data[index].toString().split(","); - for(int i=1;i