Index: src/java/org/apache/hcatalog/pig/HCatStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1299921) +++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy) @@ -70,27 +70,31 @@ @Override public void setStoreLocation(String location, Job job) throws IOException { - job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign); Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}); String[] userStr = location.split("\\."); OutputJobInfo outputJobInfo; - if(userStr.length == 2) { - outputJobInfo = OutputJobInfo.create(userStr[0], + Object outInfo = p.get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + if (outInfo != null) { + String outInfoString = (String) outInfo; + outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outInfoString); + } else { + if(userStr.length == 2) { + outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions); - } else if(userStr.length == 1) { - outputJobInfo = OutputJobInfo.create(null, + } else if(userStr.length == 1) { + outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions); - } else { - throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE); + } else { + throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE); + } } - Configuration config = job.getConfiguration(); if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){ @@ -123,6 +127,7 @@ PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_OUTPUT_INFO); p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema)); Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (revision 1299921) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (working copy) @@ -138,6 +138,7 @@ org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath); job.setOutputFormat(HBaseDirectOutputFormat.class); + job.set(TableOutputFormat.OUTPUT_TABLE, tableName); job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName); //manually create transaction Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (revision 1299921) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (working copy) @@ -54,6 +54,7 @@ import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter; import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction; import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction; import org.apache.hcatalog.hbase.snapshot.FamilyRevision; @@ -203,6 +204,7 @@ job.setOutputFormat(HBaseBulkOutputFormat.class); org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath); + job.setOutputCommitter(HBaseBulkOutputCommitter.class); //manually create transaction RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf); Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java (revision 1299921) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java (working copy) @@ -452,11 +452,12 @@ assertTrue(doesTableExist); populateHBaseTable(tableName, 2); - populateHBaseTableQualifier1(tableName, 3, null); //Running transaction - populateHBaseTableQualifier1(tableName, 4, Boolean.FALSE); //Aborted transaction - populateHBaseTableQualifier1(tableName, 5, Boolean.TRUE); //Committed transaction - populateHBaseTableQualifier1(tableName, 6, null); //Running Transaction - populateHBaseTableQualifier1(tableName, 7, Boolean.FALSE); //Aborted Transaction + populateHBaseTableQualifier1(tableName, 3, Boolean.TRUE); //Committed transaction + populateHBaseTableQualifier1(tableName, 4, null); //Running transaction + populateHBaseTableQualifier1(tableName, 5, Boolean.FALSE); //Aborted transaction + populateHBaseTableQualifier1(tableName, 6, Boolean.TRUE); //Committed transaction + populateHBaseTableQualifier1(tableName, 7, null); //Running Transaction + populateHBaseTableQualifier1(tableName, 8, Boolean.FALSE); //Aborted Transaction Configuration conf = new Configuration(hcatConf); conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, @@ -592,7 +593,7 @@ System.out.println("HCat record value" + value.toString()); boolean correctValues = (value.size() == 3) && (value.get(0).toString()).equalsIgnoreCase("testRow") - && (value.get(1).toString()).equalsIgnoreCase("textValue-2") + && (value.get(1).toString()).equalsIgnoreCase("textValue-3") && (value.get(2).toString()).equalsIgnoreCase("textValue-2"); if (correctValues == false) { Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (revision 1299921) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (working copy) @@ -80,19 +80,16 @@ public void restart(byte[] firstRow) throws IOException { allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan); - long maxValidRevision = snapshot.getLatestRevision(); + long maxValidRevision = getMaximumRevision(scan, snapshot); while (allAbortedTransactions.contains(maxValidRevision)) { maxValidRevision--; } - long minValidRevision = getMinimumRevision(scan, snapshot); - while (allAbortedTransactions.contains(minValidRevision)) { - minValidRevision--; - } Scan newScan = new Scan(scan); newScan.setStartRow(firstRow); //TODO: See if filters in 0.92 can be used to optimize the scan //TODO: Consider create a custom snapshot filter - newScan.setTimeRange(minValidRevision, maxValidRevision + 1); + //TODO: Make min revision a constant in RM + newScan.setTimeRange(0, maxValidRevision + 1); newScan.setMaxVersions(); this.scanner = this.htable.getScanner(newScan); resultItr = this.scanner.iterator(); @@ -120,16 +117,16 @@ } } - private long getMinimumRevision(Scan scan, TableSnapshot snapshot) { - long minRevision = snapshot.getLatestRevision(); + private long getMaximumRevision(Scan scan, TableSnapshot snapshot) { + long maxRevision = 0; byte[][] families = scan.getFamilies(); for (byte[] familyKey : families) { String family = Bytes.toString(familyKey); long revision = snapshot.getRevision(family); - if (revision < minRevision) - minRevision = revision; + if (revision > maxRevision) + maxRevision = revision; } - return minRevision; + return maxRevision; } /* Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (revision 1299921) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (working copy) @@ -19,7 +19,6 @@ package org.apache.hcatalog.hbase; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -27,6 +26,7 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -48,14 +49,15 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.util.StringUtils; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter; +import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.Transaction; import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager; @@ -68,20 +70,20 @@ import org.apache.zookeeper.ZooKeeper; import com.facebook.fb303.FacebookBase; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This class HBaseHCatStorageHandler provides functionality to create HBase * tables through HCatalog. The implementation is very similar to the * HiveHBaseStorageHandler, with more details to suit HCatalog. */ -//TODO remove serializable when HCATALOG-282 is fixed -public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Serializable { +public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable { public final static String DEFAULT_PREFIX = "default."; private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation"; - private transient Configuration hbaseConf; - private transient HBaseAdmin admin; + private Configuration hbaseConf; + private HBaseAdmin admin; @Override public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { @@ -96,20 +98,32 @@ jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName); Configuration jobConf = getConf(); + addHbaseResources(jobConf, jobProperties); + JobConf copy = new JobConf(jobConf); + HBaseConfiguration.addHbaseResources(copy); + //Getting hbase delegation token in getInputSplits does not work with PIG. So need to + //do it here + if (jobConf instanceof JobConf) { + //The JobConf copy will have a reference to the original's credentials if it was a JobConf. + HBaseUtil.addHBaseDelegationToken(copy); + } + String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema)); String serSnapshot = (String) inputJobInfo.getProperties().get( HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); if (serSnapshot == null) { - Configuration conf = addHbaseResources(jobConf); - HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf, + HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(copy, qualifiedTableName, tableInfo); jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, HCatUtil.serialize(snapshot)); } - addHbaseResources(jobConf, jobProperties); + //This adds it directly to the jobConf. Setting in jobProperties does not get propagated + //to JobConf as of now as the jobProperties is maintained per partition + addOutputDependencyJars(jobConf); + jobProperties.put("tmpjars", jobConf.get("tmpjars")); } catch (IOException e) { throw new IllegalStateException("Error while configuring job properties", e); @@ -128,33 +142,47 @@ HCatTableInfo tableInfo = outputJobInfo.getTableInfo(); String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo); jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName); + jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName); Configuration jobConf = getConf(); + addHbaseResources(jobConf, jobProperties); + + JobConf copy = new JobConf(jobConf); + HBaseConfiguration.addHbaseResources(copy); + String txnString = outputJobInfo.getProperties().getProperty( HBaseConstants.PROPERTY_WRITE_TXN_KEY); - if (txnString == null) { - Configuration conf = addHbaseResources(jobConf); - Transaction txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, conf); + String jobTxnString = jobConf.get(HBaseConstants.PROPERTY_WRITE_TXN_KEY); + //Pig makes 3 calls to HCatOutputFormat.setOutput(HCatStorer) with different JobConf + //which leads to creating 2 transactions. + //So apart from fixing HCatStorer to pass same OutputJobInfo, trying to make it fool proof for other such cases. + Transaction txn = null; + if (txnString == null && jobTxnString == null) { + txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, copy); + String serializedTxn = HCatUtil.serialize(txn); outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, - HCatUtil.serialize(txn)); - - if (isBulkMode(outputJobInfo) && !(outputJobInfo.getProperties() - .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) { - String tableLocation = tableInfo.getTableLocation(); - String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber()) - .toString(); - outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, - location); - // We are writing out an intermediate sequenceFile hence - // location is not passed in OutputJobInfo.getLocation() - // TODO replace this with a mapreduce constant when available - jobProperties.put("mapred.output.dir", location); - } + serializedTxn); + jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY, serializedTxn); } + if (txn == null) { + txn = (Transaction) HCatUtil.deserialize(txnString); + jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY, txnString); + } + if (isBulkMode(outputJobInfo)) { + String tableLocation = tableInfo.getTableLocation(); + String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber()) + .toString(); + outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location); + // We are writing out an intermediate sequenceFile hence + // location is not passed in OutputJobInfo.getLocation() + // TODO replace this with a mapreduce constant when available + jobProperties.put("mapred.output.dir", location); + jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName()); + } else { + jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName()); + } - jobProperties - .put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); - addHbaseResources(jobConf, jobProperties); + jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); addOutputDependencyJars(jobConf); jobProperties.put("tmpjars", jobConf.get("tmpjars")); @@ -429,7 +457,10 @@ @Override public void setConf(Configuration conf) { - hbaseConf = HBaseConfiguration.create(conf); + //Not cloning as we want to set tmpjars on it. Putting in jobProperties does not + //get propagated to JobConf in case of InputFormat as they are maintained per partition. + //Also we need to add hbase delegation token to the Credentials. + hbaseConf = conf; } private void checkDeleteTable(Table table) throws MetaException { @@ -479,8 +510,6 @@ */ private void addOutputDependencyJars(Configuration conf) throws IOException { TableMapReduceUtil.addDependencyJars(conf, - //hadoop-core - Writable.class, //ZK ZooKeeper.class, //HBase @@ -489,6 +518,8 @@ HiveException.class, //HCatalog jar HCatOutputFormat.class, + //hcat hbase storage handler jar + HBaseHCatStorageHandler.class, //hive hbase storage handler jar HBaseSerDe.class, //hive jar @@ -498,21 +529,12 @@ //hbase jar Bytes.class, //thrift-fb303 .jar - FacebookBase.class); + FacebookBase.class, + //guava jar + ThreadFactoryBuilder.class); } /** - * Utility method to get a new Configuration with hbase-default.xml and hbase-site.xml added - * @param jobConf existing configuration - * @return a new Configuration with hbase-default.xml and hbase-site.xml added - */ - private Configuration addHbaseResources(Configuration jobConf) { - Configuration conf = new Configuration(jobConf); - HBaseConfiguration.addHbaseResources(conf); - return conf; - } - - /** * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map * if they are not already present in the jobConf. * @param jobConf Job configuration Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (revision 1299921) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (working copy) @@ -63,9 +63,6 @@ @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { - job.setOutputCommitter(HBaseDirectOutputCommitter.class); - job.setIfUnset(TableOutputFormat.OUTPUT_TABLE, - job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)); outputFormat.checkOutputSpecs(ignored, job); HBaseUtil.addHBaseDelegationToken(job); } Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (revision 1299921) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (working copy) @@ -61,9 +61,6 @@ @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(Put.class); - job.setOutputCommitter(HBaseBulkOutputCommitter.class); baseOutputFormat.checkOutputSpecs(ignored, job); HBaseUtil.addHBaseDelegationToken(job); addJTDelegationToken(job); @@ -73,6 +70,8 @@ public RecordWriter, Put> getRecordWriter( FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); long version = HBaseRevisionManagerUtil.getOutputRevision(job); return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter( ignored, job, name, progress), version); @@ -188,11 +187,21 @@ try { Configuration conf = jobContext.getConfiguration(); Path srcPath = FileOutputFormat.getOutputPath(jobContext.getJobConf()); + if (!FileSystem.get(conf).exists(srcPath)) { + throw new IOException("Failed to bulk import hfiles. " + + "Intermediate data directory is cleaned up or missing. " + + "Please look at the bulk import job if it exists for failure reason"); + } Path destPath = new Path(srcPath.getParent(), srcPath.getName() + "_hfiles"); - ImportSequenceFile.runJob(jobContext, + boolean success = ImportSequenceFile.runJob(jobContext, conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY), srcPath, destPath); + if(!success) { + cleanIntermediate(jobContext); + throw new IOException("Failed to bulk import hfiles." + + " Please look at the bulk import job for failure reason"); + } rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf); rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf)); cleanIntermediate(jobContext); Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (revision 1299921) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (working copy) @@ -105,7 +105,6 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { inputFormat.setConf(job); - HBaseUtil.addHBaseDelegationToken(job); return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null, Reporter.NULL))); }