Index: src/java/org/apache/hcatalog/mapreduce/RandomAccessCommitter.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/RandomAccessCommitter.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/RandomAccessCommitter.java (revision 0) @@ -0,0 +1,43 @@ +package org.apache.hcatalog.mapreduce; + +import org.apache.hadoop.mapreduce.JobContext; + +import java.io.IOException; + +/** + * Class that implements commit-semantics for RandomAccess. + * Analogous to {@link org.apache.hadoop.mapreduce.OutputCommitter}. + */ +public interface RandomAccessCommitter { + + /** + * Analogous to {@link org.apache.hadoop.mapreduce.OutputFormat}'s checkOutputSpecs(). + * This method is called by the random-access framework to prepare the RandomAccess instance for the job. + * (Job-specific setup can go in here. E.g. Creating data-snapshots, etc.) + * @param jobContext JobContext for the job. + * @throws IOException Exception on failure. + */ + public void prepareJob(JobContext jobContext) throws IOException; + + + /** + * Method called to set up random-access for a job. + * @param jobContext Context object for the job. + * @throws IOException Exception on setup failure. + */ + public void setupJob(JobContext jobContext) throws IOException; + + /** + * Method called to commit jobs. + * @param jobContext Context object for the job. + * @throws IOException Exception on commit failure. + */ + public void commitJob(JobContext jobContext) throws IOException; + + /** + * Method called to abort jobs. + * @param jobContext Context object for the job. + * @throws IOException Exception on abort failure. + */ + public void abortJob(JobContext jobContext) throws IOException; +} Index: src/java/org/apache/hcatalog/mapreduce/RandomAccessible.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/RandomAccessible.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/RandomAccessible.java (revision 0) @@ -0,0 +1,24 @@ +package org.apache.hcatalog.mapreduce; + +import java.util.Map; + +/** + * Mix-in interface, indicating that the underlying HCat Table + * allows for random-access. + */ +public interface RandomAccessible +{ + + /** + * Method that prepares an HCat-RandomAccessible table for access from a MapReduce Job. + * @param jobProperties A properties-set, where Job-level properties may be set. + * @param randomAccessProperties A properties-set where Table-level properties may be set, during configuration. + */ + public void configureRandomAccess(Map jobProperties, Map randomAccessProperties); + + /** + * Method to retrieve a RandomAccess instance, specific to an HCat-table. + * @return The class that implements RandomAccess for the HCat table. + */ + public Class getRandomAccessClass(); +} Index: src/java/org/apache/hcatalog/mapreduce/RandomAccess.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/RandomAccess.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/RandomAccess.java (revision 0) @@ -0,0 +1,180 @@ +package org.apache.hcatalog.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.common.HCatUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Enables a user to perform random-reads from a table. + * E.g. To retrieve a row from a RandomAccessible table, using a specified key. + */ +public abstract class RandomAccess { + + public static final String CONF_KEY_RANDOM_ACCESS_ALIAS_PROPERTY_MAP = "mapreduce.random_access_alias_property_map"; + public static final String PROP_KEY_RANDOM_ACCESS_CLASS_NAME = "mapreduce.random_access.class"; + public static final String RANDOM_ACCESS_OUTPUT_FORMAT_CLASS_NAME = RandomAccessOutputFormat.class.getName(); + + /** + * Add (another) random access configuration to the configuration specified. + * @param alias The string name by which the random-access resource will be referenced. + * @param randomAccessClass The class that implements the RandomAccess. + * @param properties The (RandomAccess-class-specific) properties with which the class may be initialized. + * @param configToBeSavedInto The Job-configuration into which the RandomAccess parameters should be saved. + * @throws IOException Exception thrown if random-access config-information can't be serialized. + */ + public static void addRandomAccess(String alias, Class randomAccessClass, + HashMap properties, Configuration configToBeSavedInto) throws IOException { + if (alias == null || alias.equals("")) + throw new IllegalArgumentException("Invalid alias name: " + alias); + + if (properties == null) + properties = new HashMap(); + + String serializedMap = configToBeSavedInto.get(CONF_KEY_RANDOM_ACCESS_ALIAS_PROPERTY_MAP); + + HashMap alias_vs_property_map = (serializedMap == null)? new HashMap() : + (HashMap)HCatUtil.deserialize(serializedMap); + + properties.put(PROP_KEY_RANDOM_ACCESS_CLASS_NAME, randomAccessClass.getName()); + + alias_vs_property_map.put(alias, HCatUtil.serialize(properties)); + + configToBeSavedInto.set(CONF_KEY_RANDOM_ACCESS_ALIAS_PROPERTY_MAP, HCatUtil.serialize(alias_vs_property_map)); + + replaceOutputFormatClass(configToBeSavedInto); + } + + private static void replaceOutputFormatClass(Configuration config) { + String outputFormatClass = config.get("mapreduce.outputformat.class"); + + if (outputFormatClass == null) + throw new IllegalArgumentException("mapreduce.outputformat.class not set in config!"); + + if (!outputFormatClass.equals(RANDOM_ACCESS_OUTPUT_FORMAT_CLASS_NAME)) { + config.set(RandomAccessOutputFormat.CONF_KEY_RANDOM_ACCESS_ACTUAL_OUTPUTFORMAT, outputFormatClass); + config.set("mapreduce.outputformat.class", outputFormatClass); + } + } + + /** + * Method to retrieve the RandomAccess instance, saved in the specified Configuration object, + * using the specified alias. + * @param alias The String name by which the RandomAccess instance is referred to. + * @param config The Configuration instance that stores the RandomAccess details, for that alias. + * @param Key-type (i.e. the type of the Keys used when using the RandomAccess instance.) + * @param Input-type (i.e. the data-type that will be used when writing to the RandomAccess.) + * @param Output-type (i.e. the data-type used when reading from the RandomAccess.) + * @return The re-constructed RandomAccess instance, corresponding to the specified alias.) + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static RandomAccess + getRandomAccess(String alias, Configuration config) throws IOException { + + String serializedMap = config.get(CONF_KEY_RANDOM_ACCESS_ALIAS_PROPERTY_MAP); + if (serializedMap == null) + throw new IllegalStateException("Could not retrieve info for table-alias: " + alias + ". Forgot addRandomAccess()?"); + + HashMap alias_vs_property_map = (HashMap)HCatUtil.deserialize(serializedMap); + String serializedProperties = alias_vs_property_map.get(alias); + if (serializedProperties == null) + throw new IllegalStateException("Could not retrieve info for table-alias: " + alias + ". Forgot addRandomAccess()?"); + + HashMap properties = (HashMap)HCatUtil.deserialize(serializedProperties); + + String randomAccessClass = properties.get(PROP_KEY_RANDOM_ACCESS_CLASS_NAME); + if (randomAccessClass == null) + throw new IllegalStateException("Could not retrieve random-access class for table-alias: " + alias); + + return getRandomAccess(randomAccessClass, properties, config); + } + + /** + * Metho to retrieve all registered RandomAccess instances in the specified Configuration instance. + * @param config The Configuration object holding the RandomAccess information. + * @return A Collection of all RandomAccess instances, constructed, initialized and ready for use. + * @throws IOException On failure. + */ + static Collection getAllRandomAccess(Configuration config) throws IOException { + List randomAccessList = new ArrayList(); + + String serializedMap = config.get(CONF_KEY_RANDOM_ACCESS_ALIAS_PROPERTY_MAP); + if (serializedMap != null) { + HashMap alias_vs_property_map = (HashMap)HCatUtil.deserialize(serializedMap); + for (Map.Entry entry : alias_vs_property_map.entrySet()) { + HashMap properties = (HashMap)HCatUtil.deserialize(entry.getValue()); + String randomAccessClass = properties.get(PROP_KEY_RANDOM_ACCESS_CLASS_NAME); + if (randomAccessClass == null) + throw new IllegalStateException("Could not retrieve random-access class for table-alias: " + entry.getKey()); + + randomAccessList.add(getRandomAccess(randomAccessClass, properties, config)); + } + } + + return randomAccessList; + } + + /** + * Method to reflectively construct an instance of the RandomAccess (whose name is specified), + * and return an initialized instance. + * @param clazz The name of the RandomAccess (sub)class. + * @param properties The state-information associated with the RandomAccess. + * @param config The Configuration used to initialize the instance. + * @return The fully initialized RandomAccess instance. + * @throws IOException Exception, on failure. + */ + public static RandomAccess getRandomAccess(String clazz, Map properties, Configuration config) throws IOException { + Class randomAccessClass = getRandomAccessClass(clazz); + RandomAccess randomAccess = ReflectionUtils.newInstance(randomAccessClass, config); + randomAccess.initialize(properties); + return randomAccess; + } + + @SuppressWarnings("unchecked") + private static Class getRandomAccessClass(String className) { + try { + return (Class)Class.forName(className, true, JavaUtils.getClassLoader()); + } + catch(ClassNotFoundException exception) { + throw new IllegalArgumentException("Could not retrieve class: " + className, exception); + } + } + + /** + * Method to initialize a RandomAccess. Used in lieu of a constructor. + * Called after setConf(), if the RandomAccess implements + * {@link org.apache.hadoop.conf.Configurable} + * @param properties The implementation-specific properties with which the RandomAccess is initialized. + * @throws IOException Exception thrown if initialization fails. + */ + public abstract void initialize(Map properties) throws IOException; + + /** + * Returns the row corresponding to the specified key, + * from a RandomAccessible table. + * @param key The key for which the row is to be retrieved. + * @return The row corresponding to key. + * @throws IOException Exception thrown if records can't be retrieved. + */ + public abstract O get(K key) throws IOException; + + // Use I to implement putRecord(). + // public abstract void putRecord(K key, I input) throws IOException; + + /** + * Returns a RandomAccessCommitter instance for the RandomAccess. + * @param jobContext The jobContext corresponding to the MR job. + * @return RandomAccessCommitter instance. + * @throws IOException Exception, on failure. + */ + public abstract RandomAccessCommitter getCommitter(JobContext jobContext) throws IOException; +} Index: src/java/org/apache/hcatalog/mapreduce/HCatRandomAccess.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatRandomAccess.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/HCatRandomAccess.java (revision 0) @@ -0,0 +1,181 @@ +package org.apache.hcatalog.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.HCatRecordSerDe; +import org.apache.hcatalog.data.schema.HCatSchema; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * The bridge between RandomAccess and HCatalog. + * @param The type of the key used when accessing a record, using RandomAccess. + */ +public class HCatRandomAccess extends RandomAccess implements Configurable { + + private static Log LOG = LogFactory.getLog(HCatRandomAccess.class); + + private RandomAccess baseRandomAccess; + private InputJobInfo inputJobInfo; + private OutputJobInfo outputJobInfo; + private Configuration config; + private HCatRecordSerDe hCatSerDe; + private SerDe baseSerDe; + + public static HashMap createProperties(String dbName, String tableName, HCatSchema projection, + HashMap randomAccessProperties, Configuration config) throws IOException { + HiveMetaStoreClient metaStoreClient = null; + + try { + HiveConf hiveConf = HCatUtil.getHiveConf(config); + metaStoreClient = HCatUtil.createHiveClient(hiveConf); + Table table = metaStoreClient.getTable(dbName, tableName); + + StorageDescriptor storageDescriptor = table.getSd(); + StorerInfo storerInfo = extractStorerInfo(storageDescriptor, table.getParameters()); + + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(config, storerInfo); + + if (!(storageHandler instanceof RandomAccessible)) + throw new IOException("Random Access not supported for " + getDBTableName(dbName, tableName)); + + RandomAccessible randomAccessibleTable = (RandomAccessible)storageHandler; + + InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, ""); + inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); + + OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, null); + outputJobInfo.setOutputSchema(projection); + + randomAccessProperties.put(HCatConstants.HCAT_KEY_BASE_RANDOM_ACCESS, randomAccessibleTable.getRandomAccessClass().getName()); + randomAccessProperties.put(HCatConstants.HCAT_KEY_INPUT_JOB_INFO, HCatUtil.serialize(inputJobInfo)); + randomAccessProperties.put(HCatConstants.HCAT_KEY_OUTPUT_JOB_INFO, HCatUtil.serialize(outputJobInfo)); + randomAccessProperties.put(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(hiveConf.getAllProperties())); + randomAccessProperties.put(HCatConstants.HCAT_KEY_BASE_SERDE, storageHandler.getSerDeClass().getName()); + + // TODO: Figure out how to pass job-configuration settings back to the job. + randomAccessibleTable.configureRandomAccess(new HashMap(), randomAccessProperties); + + } + catch(Exception ex) { + throw (ex instanceof HCatException)? (HCatException)ex : new HCatException(ErrorType.ERROR_SET_OUTPUT, ex); + } + finally { + if (metaStoreClient != null) + metaStoreClient.close(); + } + + return randomAccessProperties; + } + + static StorerInfo extractStorerInfo(StorageDescriptor sd, Map tableParameters) throws IOException { + Properties hcatProperties = new Properties(); + for (String key : tableParameters.keySet()){ + hcatProperties.put(key, tableParameters.get(key)); + } + + // also populate with StorageDescriptor->SerDe.Parameters + for (Map.Entryparam : + sd.getSerdeInfo().getParameters().entrySet()) { + hcatProperties.put(param.getKey(), param.getValue()); + } + + return new StorerInfo(null, null, + sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(), + tableParameters.get(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE), + hcatProperties); + } + + private static String getDBTableName(String dbName, String tableName) { + return dbName + "." + tableName; + } + + @Override + public void setConf(Configuration config) { + this.config = config; + } + + @Override + public Configuration getConf() { + return this.config; + } + + @Override + public void initialize(Map properties) throws IOException { + String baseRandomAccessClassName = properties.get(HCatConstants.HCAT_KEY_BASE_RANDOM_ACCESS); + if (baseRandomAccessClassName == null) { + throw new IllegalStateException("Could not retrieve underlying RandomAccess class."); + } + this.baseRandomAccess = RandomAccess.getRandomAccess(baseRandomAccessClassName, properties, config); + + String inputJobInfo = properties.get(HCatConstants.HCAT_KEY_INPUT_JOB_INFO); + if (inputJobInfo == null) { + throw new IllegalStateException("Couldn't retrieve InputJobInfo."); + } + this.inputJobInfo = (InputJobInfo)HCatUtil.deserialize(inputJobInfo); + + String outputJobInfo = properties.get(HCatConstants.HCAT_KEY_OUTPUT_JOB_INFO); + if (outputJobInfo == null) { + throw new IllegalStateException("Couldn't retrieve OutputJobInfo."); + } + this.outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(outputJobInfo); + + this.hCatSerDe = (HCatRecordSerDe)getSerDe(HCatRecordSerDe.class.getName()); + this.baseSerDe = getSerDe(properties.get(HCatConstants.HCAT_KEY_BASE_SERDE)); + } + + @SuppressWarnings({"unchecked"}) + private SerDe getSerDe(String className) throws IOException { + try { + Class serDeClass = (Class)Class.forName(className); + SerDe serDe = ReflectionUtils.newInstance(serDeClass, config); + InternalUtil.initializeSerDe(serDe, config, inputJobInfo.getTableInfo(), outputJobInfo.getOutputSchema()); + return serDe; + } + catch (ClassNotFoundException classNotFound) { + throw new IOException("Could not instantiate serDe: " + className, classNotFound); + } + catch (SerDeException serDeException) { + throw new IOException("Could not initialize serDe: " + className, serDeException); + } + } + + @Override + public HCatRecord get(KeyType key) throws IOException { + try { + Writable writable = (Writable)baseRandomAccess.get(key); + Object lazyRow = baseSerDe.deserialize(writable); + return (HCatRecord)hCatSerDe.serialize(lazyRow, baseSerDe.getObjectInspector()); + } + catch(SerDeException exception) { + throw new IOException("Could not de-serialize: ", exception); + } + } + + @Override + public RandomAccessCommitter getCommitter(JobContext jobContext) throws IOException { + return baseRandomAccess.getCommitter(jobContext); + } +} Index: src/java/org/apache/hcatalog/mapreduce/RandomAccessOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/RandomAccessOutputFormat.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/RandomAccessOutputFormat.java (revision 0) @@ -0,0 +1,51 @@ +package org.apache.hcatalog.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.IOException; + +class RandomAccessOutputFormat extends OutputFormat, Writable> { + + public static Log LOG = LogFactory.getLog(RandomAccessOutputFormat.class); + + public static final String CONF_KEY_RANDOM_ACCESS_ACTUAL_OUTPUTFORMAT = "mapreduce.lib.randomaccess.actual.outputformat"; + + @Override + public RecordWriter, Writable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return getUnderlyingOutputFormat(taskAttemptContext.getConfiguration()).getRecordWriter(taskAttemptContext); + } + + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { + for (RandomAccess randomAccess : RandomAccess.getAllRandomAccess(jobContext.getConfiguration())) + randomAccess.getCommitter(jobContext).prepareJob(jobContext); + + getUnderlyingOutputFormat(jobContext.getConfiguration()).checkOutputSpecs(jobContext); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return getUnderlyingOutputFormat(taskAttemptContext.getConfiguration()).getOutputCommitter(taskAttemptContext); + } + + private static OutputFormat getUnderlyingOutputFormat(Configuration config) { + try { + String underlyingOFClassName = config.get(CONF_KEY_RANDOM_ACCESS_ACTUAL_OUTPUTFORMAT); + return (OutputFormat)ReflectionUtils.newInstance(Class.forName(underlyingOFClassName), config); + } + catch (ClassNotFoundException exception) { + LOG.error("Could not retrieve underlying OutputFormat.", exception); + throw new IllegalStateException("Unexpected exception. Could not retrieve underlying OutputFormat.", exception); + } + } +} Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1306669) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -76,6 +76,10 @@ public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig"; public static final String HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.sig"; public static final String HCAT_KEY_JOBCLIENT_TOKEN_STRFORM = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.strform"; + public static final String HCAT_KEY_INPUT_JOB_INFO = "mapreduce.lib.hcat.input.job.info"; + public static final String HCAT_KEY_OUTPUT_JOB_INFO = "mapreduce.lib.hcat.output.job.info"; + public static final String HCAT_KEY_BASE_RANDOM_ACCESS = "mapreduce.lib.hcat.base.random_access"; + public static final String HCAT_KEY_BASE_SERDE = "mapreduce.lib.hcat.base.serde"; public static final String[] OUTPUT_CONFS_TO_SAVE = { HCAT_KEY_OUTPUT_INFO, Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseRandomAccess.java =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseRandomAccess.java (revision 0) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseRandomAccess.java (revision 0) @@ -0,0 +1,311 @@ +package org.apache.hcatalog.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.mapreduce.RandomAccess; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; +import org.apache.hcatalog.hbase.snapshot.Transaction; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.HCatRandomAccess; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Arrays; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestHBaseRandomAccess extends SkeletonHBaseTest { + + private static Log LOG = LogFactory.getLog(TestHBaseRandomAccess.class); + + private static HiveConf hcatConf; + private static HCatDriver hcatDriver; + private static Path mapReduceOutputDir; + + private static final String FAMILY = "family"; + private static final String KEY_COLUMN = "in_numbers"; + private static final String VAL_COLUMN = "in_text"; + + private static final String CONFIG_KEY_CHECK_FOR_DIRTY_READS = "CHECK_FOR_DIRTY_READS"; + private static final String CONFIG_KEY_DATABASE_NAME = "DATABASE_NAME"; + private static final String CONFIG_KEY_TABLE_NAME = "TABLE_NAME"; + + private static final String DIRTY_READ_VALUES = "Newer values."; + + + private HiveConf createHCatConfig() throws Exception { + HiveConf hcatConfig = getHiveConf(); + hcatConfig.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + URI fsuri = getFileSystem().getUri(); + Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(), + getTestDir()); + hcatConfig.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString()); + hcatConfig.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, whPath.toString()); + + // Copy hbase properties. + for (Map.Entry el : getHbaseConf()) { + if (el.getKey().startsWith("hbase.")) { + hcatConfig.set(el.getKey(), el.getValue()); + } + } + return hcatConfig; + } + + private void setUpMapReduceOutputDir(Path mapReduceOutputDir) throws Exception { + FileSystem fs = getFileSystem(); + if (fs.exists(mapReduceOutputDir)) + assertTrue("Could not clear the map-reduce output directory, for test.", + fs.delete(mapReduceOutputDir, true)); + } + + private void setUp() throws Exception { + hcatConf = createHCatConfig(); + SessionState.start(new CliSessionState(hcatConf)); + hcatDriver = new HCatDriver(); + mapReduceOutputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead"); + setUpMapReduceOutputDir(mapReduceOutputDir); + } + + private static class ToText { + public ToText(int i) { this.i = i; } + public String toString() { return strings[i%strings.length]; } + + private static final String strings[] = { + "Zero", "One", "Two", "Three", "Four", + "Five", "Six", "Seven", "Eight", "Nine" + }; + + private int i; + } + + private List generatePuts(String tableName, int nRecords, String valueToSetForRecords) throws IOException { + List columnFamilies = Arrays.asList(FAMILY); + RevisionManager rm = null; + List myPuts; + try { + rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(getHbaseConf()); + rm.open(); + Transaction tsx = rm.beginWriteTransaction(tableName, columnFamilies); + long revisionNumber = tsx.getRevisionNumber(); + rm.commitWriteTransaction(tsx); + + myPuts = new ArrayList(); + for (int i = 0; i < nRecords; ++i) { + Put put = new Put(Bytes.toBytes("" + i)); + put.add(Bytes.toBytes(FAMILY), Bytes.toBytes(VAL_COLUMN), revisionNumber, + Bytes.toBytes(valueToSetForRecords.equals("")? new ToText(i).toString() : valueToSetForRecords)); + myPuts.add(put); + } + } finally { + if (rm != null) + rm.close(); + } + return myPuts; + } + + private void populateHBaseTable(String tableName, int nRecords, String valueToSetForRecords) throws IOException { + List myPuts = generatePuts(tableName, nRecords, valueToSetForRecords); + HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tableName)); + table.put(myPuts); + } + + private void createHCatTable(String databaseName, String tableName, String databaseDir) throws IOException { + String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + + databaseDir + "'"; + CommandProcessorResponse responseOne = hcatDriver.run(dbquery); + assertEquals("Could not create database in HCat, with command:\t" + dbquery, 0, responseOne.getResponseCode()); + + String tableQuery = "CREATE TABLE " + databaseName + "." + tableName + + "(" + KEY_COLUMN + " string, " + VAL_COLUMN + " string) STORED BY " + + "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" + + "TBLPROPERTIES ('hbase.columns.mapping'=':key," + FAMILY + ":" + VAL_COLUMN + "')"; + CommandProcessorResponse responseTwo = hcatDriver.run(tableQuery); + assertEquals("Could not create the table in HCat, with command:\t" + tableQuery, 0, responseTwo.getResponseCode()); + + // Check that the HBaseTable now exists. + LOG.info("Checking if HBase table " + databaseName + "." + tableName + " exists: " + new HBaseAdmin(getHbaseConf()).tableExists(databaseName + "." + tableName)); + assertTrue("HCat table has been created. HBase table must now exist.", + new HBaseAdmin(getHbaseConf()).tableExists(databaseName + "." + tableName)); + } + + private Configuration getConfig() throws Exception { + Configuration configuration = new Configuration(hcatConf); + configuration.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); + return configuration; + } + + private void tearDownTest(String databaseName, String tableName) throws Exception { + + String hBaseTableName = databaseName + "." + tableName; + + assertEquals("Could not drop table, during test-tear-down.", + 0, hcatDriver.run("DROP TABLE " + hBaseTableName).getResponseCode()); + + assertFalse("Dropping table during tearDown should have deleted table in HBase.", + new HBaseAdmin(getHbaseConf()).tableExists(hBaseTableName)); + + assertEquals("Could not drop database, during test tearDown.", + 0, hcatDriver.run("DROP DATABASE " + databaseName).getResponseCode()); + + } + + private Job createSelfJoinJob(Configuration config, String databaseName, String tableName, boolean checkForDirtyReads) throws Exception { + setUpMapReduceOutputDir(mapReduceOutputDir); + + config.set(CONFIG_KEY_DATABASE_NAME, databaseName); + config.set(CONFIG_KEY_TABLE_NAME, tableName); + config.setBoolean(CONFIG_KEY_CHECK_FOR_DIRTY_READS, checkForDirtyReads); + + Job job = new Job(config, "hBase-self-hash-join"); + job.setJarByClass(this.getClass()); + job.setMapperClass(SelfJoinMapper.class); + + job.setInputFormatClass(HCatInputFormat.class); + InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, tableName, null); + HCatInputFormat.setInput(job, inputJobInfo); + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, mapReduceOutputDir); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(0); + + return job; + } + + private void submitJob(Job job) throws Exception { + assertTrue("Self-join job should have completed successfully.", job.waitForCompletion(true)); + } + + @Test + public void TestHBaseTableReadMR() throws Exception { + + try { + setUp(); + + // Unique names for database and table. + String databaseName = newTableName("mydatabase"); + String tableName = newTableName("mytable"); + String hBaseTableName = databaseName + "." + tableName; + String databaseDir = getTestDir() + "/hbasedb"; + + createHCatTable(databaseName, tableName, databaseDir); + + populateHBaseTable(hBaseTableName, 20, ""); + + Configuration config = getConfig(); + for (Map.Entry entry : config) + LOG.info("From Front-end : \t\t" + entry.getKey() + "\t\t:\t\t" + entry.getValue()); + + HCatFieldSchema keyField = new HCatFieldSchema(KEY_COLUMN, HCatFieldSchema.Type.STRING, ""); + HCatFieldSchema valField = new HCatFieldSchema(VAL_COLUMN, HCatFieldSchema.Type.STRING, ""); + HCatSchema projectionSchema = new HCatSchema(Arrays.asList(keyField, valField)); + + Job job = createSelfJoinJob(config, databaseName, tableName, false); + + HashMap properties = new HashMap(); + properties.put(HCatConstants.HCAT_KEY_HIVE_CONF, config.get(HCatConstants.HCAT_KEY_HIVE_CONF)); + RandomAccess.addRandomAccess("myRandomAccess", HCatRandomAccess.class, + HCatRandomAccess.createProperties(databaseName, tableName, projectionSchema, properties, job.getConfiguration()), job.getConfiguration()); + + submitJob(job);// No check for dirty reads, on first go. + + // Snapshot has been taken. Add new revisions that shouldn't turn up in random-read. + populateHBaseTable(hBaseTableName, 20, DIRTY_READ_VALUES); + submitJob(job); // Now check for dirty reads. + + tearDownTest(databaseName, tableName); + } + catch(Throwable t) { + LOG.error("Unexpected exception: ", t); + assertTrue("Unexpected exception " + t.getMessage(), false); + } + } + + static class SelfJoinMapper + extends Mapper, Text> { + + private static RandomAccess randomAccess; + private static HCatSchema projectionSchema; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + try { + HCatFieldSchema keyField = new HCatFieldSchema(KEY_COLUMN, HCatFieldSchema.Type.STRING, ""); + HCatFieldSchema valField = new HCatFieldSchema(VAL_COLUMN, HCatFieldSchema.Type.STRING, ""); + projectionSchema = new HCatSchema(Arrays.asList(keyField, valField)); + Configuration configuration = context.getConfiguration(); + randomAccess = RandomAccess.getRandomAccess("myRandomAccess", configuration); + LOG.info("Random-read from setup: (Can't be dirty): " + randomAccess.get("0")); + } + catch(Throwable t) { + LOG.error("Couldn't set up: ", t); + throw new IOException(t); + } + super.setup(context); + } + + @Override + public void map(ImmutableBytesWritable key, HCatRecord value, + Context context) throws IOException, InterruptedException { + try { + LOG.info("HCat record value:" + value.toString()); + LOG.info("in_numbers: " + value.get(KEY_COLUMN, projectionSchema)); + LOG.info("in_text: " + value.get(VAL_COLUMN, projectionSchema)); + + LOG.info("Random-read From map: (Can't be dirty)" + randomAccess.get("0")); + + if (context.getConfiguration().getBoolean(CONFIG_KEY_CHECK_FOR_DIRTY_READS, false)) { + assertFalse("Dirty read!!", + ((String)(randomAccess.get((String) value.get(KEY_COLUMN, projectionSchema)) + .get(VAL_COLUMN, projectionSchema))).contains(DIRTY_READ_VALUES)); + } + else { + LOG.info("after join: " + randomAccess.get((String) value.get(KEY_COLUMN, projectionSchema)) + .get(VAL_COLUMN, projectionSchema)); + assertTrue("The value received in the map must match the value from from the random-access.", + value.get(VAL_COLUMN, projectionSchema).equals( + randomAccess.get((String) value.get(KEY_COLUMN, projectionSchema)).get( + VAL_COLUMN, projectionSchema))); + } + } catch (Throwable t) { + LOG.error("Unexpected Exception: ", t); + assertTrue("Unexpected Exception.", false); + } + } + } +} Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRandomAccess.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRandomAccess.java (revision 0) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRandomAccess.java (revision 0) @@ -0,0 +1,164 @@ +package org.apache.hcatalog.hbase; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.mapreduce.RandomAccess; +import org.apache.hcatalog.mapreduce.RandomAccessCommitter; +import org.apache.hcatalog.hbase.snapshot.FamilyRevision; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.OutputJobInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class HBaseRandomAccess extends RandomAccess { + + private static Log LOG = LogFactory.getLog(HBaseRandomAccess.class); + + private InputJobInfo inputJobInfo; + private OutputJobInfo outputJobInfo; + private Configuration config; + + private String filteredHBaseColumnMappingString; + private ArrayList filteredHBaseColumnFamilies = new ArrayList(); + private ArrayList filteredHBaseColumnQualifiers = new ArrayList(); + + private HTable hBaseTable; + + private SnapshotFilter snapshotFilter; + + public static final String CONF_KEY_HBASE_CONFIGURATION = "mapreduce.lib.hcat.hbase.random_access.configuration"; + + @Override + public void initialize(Map properties) throws IOException { + String inputJobInfoString = properties.get(HCatConstants.HCAT_KEY_INPUT_JOB_INFO); + if (inputJobInfoString == null) + throw new IllegalStateException("Couldn't retrieve InputJobInfo."); + this.inputJobInfo = (InputJobInfo) HCatUtil.deserialize(inputJobInfoString); + + String outputJobInfoString = properties.get(HCatConstants.HCAT_KEY_OUTPUT_JOB_INFO); + if (outputJobInfoString == null) + throw new IllegalStateException("Couldn't retrieve OutputJobInfo."); + this.outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(outputJobInfoString); + + this.config = createConfig(properties); + + processColumnMappingString(); + this.inputJobInfo.getTableInfo().getStorerInfo().getProperties() + .setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, filteredHBaseColumnMappingString); + properties.put(HCatConstants.HCAT_KEY_INPUT_JOB_INFO, HCatUtil.serialize(this.inputJobInfo)); + + String hBaseTableName = HBaseHCatStorageHandler.getFullyQualifiedName(this.inputJobInfo.getTableInfo()); + this.hBaseTable = new HTable(this.config, hBaseTableName); + + this.snapshotFilter = new SnapshotFilter(inputJobInfo, toStringList(filteredHBaseColumnFamilies), config); + + } + + private static ArrayList toStringList(ArrayList byteArrayList) { + ArrayList stringList = new ArrayList(byteArrayList.size()); + for (byte[] bytes : byteArrayList) + stringList.add(Bytes.toString(bytes)); + return stringList; + } + + private static Configuration createConfig(Map properties) throws IOException { + Configuration config = new Configuration(); + + String configPropertiesString = properties.get(CONF_KEY_HBASE_CONFIGURATION); + if (configPropertiesString == null) + throw new IllegalStateException("Could not retrieve HBase-configuration."); + + HashMap hBaseProperties = (HashMap)HCatUtil.deserialize(configPropertiesString); + for (Map.Entry entry : hBaseProperties.entrySet()) + config.set(entry.getKey(), entry.getValue()); + + return config; + } + + private void processColumnMappingString() throws IOException { + + String columnMappingString = inputJobInfo.getTableInfo().getStorerInfo().getProperties() + .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING); + if (columnMappingString == null || columnMappingString.equals("")) + throw new IOException("Could not retrieve hbase column-mapping-string."); + + ArrayList columnQualifiers = new ArrayList(); + ArrayList columnFamilies = new ArrayList(); + HBaseUtil.parseColumnMapping(columnMappingString, columnFamilies, null, columnQualifiers, null); + + // Get positions of fields in the Projection-schema, within the table-schema. + List columnPositions = new ArrayList(); + for (String columnName : outputJobInfo.getOutputSchema().getFieldNames()) + columnPositions.add(inputJobInfo.getTableInfo().getDataColumns().getPosition(columnName)); + + // For chosen positions, add the corresponding hbase-columns into string. + StringBuilder filteredColumnMappingString = new StringBuilder(); + for (Integer columnPosition : columnPositions) { + String columnFamily = columnFamilies.get(columnPosition); + String columnQualifier = columnQualifiers.get(columnPosition); + if (columnQualifier == null) columnQualifier = ""; + if (!columnFamily.equals(HBaseSerDe.HBASE_KEY_COL)) { + filteredHBaseColumnFamilies.add(Bytes.toBytes(columnFamily)); + filteredHBaseColumnQualifiers.add(Bytes.toBytes(columnQualifier)); + filteredColumnMappingString.append(columnFamily) + .append(":") + .append(columnQualifier) + .append(","); + } + } + + filteredHBaseColumnMappingString = filteredColumnMappingString.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("Filtered column mapping string: " + filteredColumnMappingString); + for (int i=0; i( - uniqueColumnFamilies)); - } + rm.createTable(tableName, new ArrayList(uniqueColumnFamilies)); } catch (MasterNotRunningException mnre) { throw new MetaException(StringUtils.stringifyException(mnre)); @@ -409,7 +397,7 @@ } } - private String getHBaseTableName(Table tbl) { + private String getFullyQualifiedHBaseTableName(Table tbl) { String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME); if (tableName == null) { tableName = tbl.getSd().getSerdeInfo().getParameters() @@ -421,10 +409,28 @@ } else { tableName = tbl.getDbName() + "." + tbl.getTableName(); } + tableName = tableName.toLowerCase(); } return tableName; } + static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo){ + String qualifiedName = tableInfo.getStorerInfo().getProperties() + .getProperty(HBaseSerDe.HBASE_TABLE_NAME); + if (qualifiedName == null) { + String databaseName = tableInfo.getDatabaseName(); + String tableName = tableInfo.getTableName(); + if ((databaseName == null) + || (databaseName.equals(MetaStoreUtils.DEFAULT_DATABASE_NAME))) { + qualifiedName = tableName; + } else { + qualifiedName = databaseName + "." + tableName; + } + qualifiedName = qualifiedName.toLowerCase(); + } + return qualifiedName; + } + @Override public Class getInputFormatClass() { return HBaseInputFormat.class; @@ -449,6 +455,10 @@ return HBaseSerDe.class; } + public Configuration getJobConf() { + return jobConf; + } + @Override public Configuration getConf() { @@ -460,15 +470,23 @@ @Override public void setConf(Configuration conf) { - //Not cloning as we want to set tmpjars on it. Putting in jobProperties does not - //get propagated to JobConf in case of InputFormat as they are maintained per partition. - //Also we need to add hbase delegation token to the Credentials. - hbaseConf = conf; + //setConf is called both during DDL operations and mapred read/write jobs. + //Creating a copy of conf for DDL and adding hbase-default and hbase-site.xml to it. + //For jobs, maintaining a reference instead of cloning as we need to + // 1) add hbase delegation token to the Credentials. + // 2) set tmpjars on it. Putting in jobProperties does not get propagated to JobConf + // in case of InputFormat as they are maintained per partition. + //Not adding hbase-default.xml and hbase-site.xml to jobConf as it will override any + //hbase properties set in the JobConf by the user. In configureInputJobProperties and + //configureOutputJobProperties, we take care of adding the default properties + //that are not already present. TODO: Change to a copy for jobs after HCAT-308 is fixed. + jobConf = conf; + hbaseConf = HBaseConfiguration.create(conf); } private void checkDeleteTable(Table table) throws MetaException { boolean isExternal = MetaStoreUtils.isExternalTable(table); - String tableName = getHBaseTableName(table); + String tableName = getFullyQualifiedHBaseTableName(table); RevisionManager rm = null; try { if (!isExternal && getHBaseAdmin().tableExists(tableName)) { @@ -478,12 +496,9 @@ } getHBaseAdmin().deleteTable(tableName); - //Set up znodes in revision manager. + //Drop table in revision manager. rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf); - if (rm instanceof ZKBasedRevisionManager) { - ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm; - zkRM.deleteZNodes(tableName); - } + rm.dropTable(tableName); } } catch (IOException ie) { throw new MetaException(StringUtils.stringifyException(ie)); @@ -492,26 +507,12 @@ } } - static String getFullyQualifiedName(HCatTableInfo tableInfo){ - String qualifiedName; - String databaseName = tableInfo.getDatabaseName(); - String tableName = tableInfo.getTableName(); - - if ((databaseName == null) || (databaseName.equals(MetaStoreUtils.DEFAULT_DATABASE_NAME))) { - qualifiedName = tableName; - } else { - qualifiedName = databaseName + "." + tableName; - } - - return qualifiedName; - } - /** * Helper method for users to add the required depedency jars to distributed cache. * @param conf * @throws IOException */ - private void addOutputDependencyJars(Configuration conf) throws IOException { + static void addOutputDependencyJars(Configuration conf) throws IOException { TableMapReduceUtil.addDependencyJars(conf, //ZK ZooKeeper.class, @@ -556,7 +557,7 @@ public static boolean isBulkMode(OutputJobInfo outputJobInfo) { //Default is false String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties() - .getProperty(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY, + .getProperty(HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY, "false"); return "true".equals(bulkMode); } @@ -564,7 +565,7 @@ private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException { StringBuilder builder = new StringBuilder(); String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties() - .getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY); + .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING); if (outputColSchema == null) { String[] splits = hbaseColumnMapping.split("[,]"); for (int i = 0; i < splits.length; i++) { @@ -603,5 +604,53 @@ builder.deleteCharAt(builder.length() - 1); return builder.toString(); } + + // Implementation of the RandomAccessible interface. + @Override + public void configureRandomAccess(Map jobProperties, Map randomAccessProperties) { + + String inputJobInfoString = randomAccessProperties.get(HCatConstants.HCAT_KEY_INPUT_JOB_INFO); + try { + InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(inputJobInfoString); + + HCatTableInfo tableInfo = inputJobInfo.getTableInfo(); + String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo); + + Configuration jobConf = getConf(); + Configuration copyOfConf = new Configuration(jobConf); + HBaseConfiguration.addHbaseResources(copyOfConf); + + // Get Delegation tokens for HBase. (Done before taking snapshots.) + // Note: Delegation Tokens will be retrieved again, when the RandomAccess is initialized. + if (jobConf instanceof JobConf) { + HBaseUtil.addHBaseDelegationToken((JobConf)jobConf); + } + + String serSnapshot = (String) inputJobInfo.getProperties().get( + HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); + + if (serSnapshot == null) { + HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(copyOfConf, + qualifiedTableName, tableInfo); + inputJobInfo.getProperties().put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, + HCatUtil.serialize(snapshot)); + + randomAccessProperties.put(HCatConstants.HCAT_KEY_INPUT_JOB_INFO, HCatUtil.serialize(inputJobInfo)); + } + + HashMap jobConfProperties = new HashMap(); + for (Map.Entry entry : copyOfConf) + jobConfProperties.put(entry.getKey(), entry.getValue()); + randomAccessProperties.put(HBaseRandomAccess.CONF_KEY_HBASE_CONFIGURATION, HCatUtil.serialize(jobConfProperties)); + + } catch (IOException e) { + throw new IllegalStateException("Error while configuring job properties", e); + } + } + + @Override + public Class getRandomAccessClass() { + return HBaseRandomAccess.class; + } } Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (revision 1306669) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (working copy) @@ -19,17 +19,12 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -39,10 +34,6 @@ import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.hbase.snapshot.FamilyRevision; -import org.apache.hcatalog.hbase.snapshot.RevisionManager; -import org.apache.hcatalog.hbase.snapshot.TableSnapshot; import org.apache.hcatalog.mapreduce.InputJobInfo; /** @@ -54,24 +45,17 @@ static final Log LOG = LogFactory.getLog(HbaseSnapshotRecordReader.class); private final InputJobInfo inpJobInfo; private final Configuration conf; - private final int maxRevisions = 1; private ResultScanner scanner; private Scan scan; private HTable htable; - private TableSnapshot snapshot; private Iterator resultItr; - private Set allAbortedTransactions; private DataOutputBuffer valueOut = new DataOutputBuffer(); private DataInputBuffer valueIn = new DataInputBuffer(); + private SnapshotFilter snapshotFilter; HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException { this.inpJobInfo = inputJobInfo; this.conf = conf; - String snapshotString = conf.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); - HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil - .deserialize(snapshotString); - this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(hcatSnapshot, - inpJobInfo.getTableInfo()); } public void init() throws IOException { @@ -79,56 +63,19 @@ } public void restart(byte[] firstRow) throws IOException { - allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan); - long maxValidRevision = getMaximumRevision(scan, snapshot); - while (allAbortedTransactions.contains(maxValidRevision)) { - maxValidRevision--; - } + + this.snapshotFilter = new SnapshotFilter(inpJobInfo, toStringList(scan.getFamilies()), conf); + Scan newScan = new Scan(scan); newScan.setStartRow(firstRow); //TODO: See if filters in 0.92 can be used to optimize the scan //TODO: Consider create a custom snapshot filter //TODO: Make min revision a constant in RM - newScan.setTimeRange(0, maxValidRevision + 1); - newScan.setMaxVersions(); + snapshotFilter.applyConstraints(newScan); this.scanner = this.htable.getScanner(newScan); resultItr = this.scanner.iterator(); } - private Set getAbortedTransactions(String tableName, Scan scan) throws IOException { - Set abortedTransactions = new HashSet(); - RevisionManager rm = null; - try { - rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf); - byte[][] families = scan.getFamilies(); - for (byte[] familyKey : families) { - String family = Bytes.toString(familyKey); - List abortedWriteTransactions = rm.getAbortedWriteTransactions( - tableName, family); - if (abortedWriteTransactions != null) { - for (FamilyRevision revision : abortedWriteTransactions) { - abortedTransactions.add(revision.getRevision()); - } - } - } - return abortedTransactions; - } finally { - HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm); - } - } - - private long getMaximumRevision(Scan scan, TableSnapshot snapshot) { - long maxRevision = 0; - byte[][] families = scan.getFamilies(); - for (byte[] familyKey : families) { - String family = Bytes.toString(familyKey); - long revision = snapshot.getRevision(family); - if (revision > maxRevision) - maxRevision = revision; - } - return maxRevision; - } - /* * @param htable The HTable ( of HBase) to use for the record reader. * @@ -144,6 +91,13 @@ public void setScan(Scan scan) { this.scan = scan; } + + private static List toStringList(byte[][] columnFamilies) { + List list = new ArrayList(columnFamilies.length); + for (byte[] family : columnFamilies) + list.add(Bytes.toString(family)); + return list; + } @Override public ImmutableBytesWritable createKey() { @@ -176,7 +130,7 @@ } else { while (resultItr.hasNext()) { Result temp = resultItr.next(); - Result hbaseRow = prepareResult(temp.list()); + Result hbaseRow = snapshotFilter.filter(temp); if (hbaseRow != null) { // Update key and value. Currently no way to avoid serialization/de-serialization // as no setters are available. @@ -193,55 +147,6 @@ return false; } - private Result prepareResult(List keyvalues) { - - List finalKeyVals = new ArrayList(); - Map> qualValMap = new HashMap>(); - for (KeyValue kv : keyvalues) { - byte[] cf = kv.getFamily(); - byte[] qualifier = kv.getQualifier(); - String key = Bytes.toString(cf) + ":" + Bytes.toString(qualifier); - List kvs; - if (qualValMap.containsKey(key)) { - kvs = qualValMap.get(key); - } else { - kvs = new ArrayList(); - } - - String family = Bytes.toString(kv.getFamily()); - //Ignore aborted transactions - if (allAbortedTransactions.contains(kv.getTimestamp())) { - continue; - } - - long desiredTS = snapshot.getRevision(family); - if (kv.getTimestamp() <= desiredTS) { - kvs.add(kv); - } - qualValMap.put(key, kvs); - } - - Set keys = qualValMap.keySet(); - for (String cf : keys) { - List kvs = qualValMap.get(cf); - if (maxRevisions <= kvs.size()) { - for (int i = 0; i < maxRevisions; i++) { - finalKeyVals.add(kvs.get(i)); - } - } else { - finalKeyVals.addAll(kvs); - } - } - - if(finalKeyVals.size() == 0){ - return null; - } else { - KeyValue[] kvArray = new KeyValue[finalKeyVals.size()]; - finalKeyVals.toArray(kvArray); - return new Result(kvArray); - } - } - /* * @see org.apache.hadoop.hbase.mapred.TableRecordReader#close() */ Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRandomAccessCommitter.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRandomAccessCommitter.java (revision 0) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRandomAccessCommitter.java (revision 0) @@ -0,0 +1,47 @@ +package org.apache.hcatalog.hbase; + +import com.facebook.fb303.FacebookBase; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hcatalog.mapreduce.RandomAccessCommitter; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.thrift.TBase; +import org.apache.zookeeper.ZooKeeper; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; + + +import java.io.IOException; + +public class HBaseRandomAccessCommitter implements RandomAccessCommitter { + + @Override + public void prepareJob(JobContext jobContext) throws IOException { + Configuration config = jobContext.getConfiguration(); + HBaseConfiguration.addHbaseResources(config); + if (config instanceof JobConf) + HBaseUtil.addHBaseDelegationToken((JobConf)config); + + HBaseHCatStorageHandler.addOutputDependencyJars(jobContext.getConfiguration()); + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + } + + @Override + public void abortJob(JobContext jobContext) throws IOException { + } +} Index: storage-handlers/hbase/src/gen-java/org/apache/hcatalog/hbase/SnapshotFilter.java =================================================================== --- storage-handlers/hbase/src/gen-java/org/apache/hcatalog/hbase/SnapshotFilter.java (revision 0) +++ storage-handlers/hbase/src/gen-java/org/apache/hcatalog/hbase/SnapshotFilter.java (revision 0) @@ -0,0 +1,124 @@ +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.hbase.snapshot.FamilyRevision; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; +import org.apache.hcatalog.hbase.snapshot.TableSnapshot; +import org.apache.hcatalog.mapreduce.InputJobInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class SnapshotFilter { + + private static final int MAX_NUM_REVISIONS = 1; // TODO: Make configurable? + + private Configuration config; + private TableSnapshot snapshot; + private String hBaseTableName; + private List columnFamilies; + private Set abortedTransactions; + private Long maxValidRevision; + + public SnapshotFilter(InputJobInfo inputJobInfo, List columnFamilies, Configuration config) throws IOException { + this.config = config; + this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(retrieveHCatTableSnapshot(inputJobInfo), + inputJobInfo.getTableInfo()); + this.hBaseTableName = HBaseHCatStorageHandler.getFullyQualifiedName(inputJobInfo.getTableInfo()); + this.columnFamilies = columnFamilies; + this.abortedTransactions = getAbortedTransactions(); + this.maxValidRevision = snapshot.getLatestRevision(); + while (abortedTransactions.contains(maxValidRevision)) + --maxValidRevision; + } + + private HCatTableSnapshot retrieveHCatTableSnapshot(InputJobInfo inputJobInfo) throws IOException { + String snapshotString = inputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); + if (snapshotString == null) + snapshotString = config.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); + + if (snapshotString == null) + throw new IOException("Could not retrieve snapshot-string!"); + + return (HCatTableSnapshot) HCatUtil.deserialize(snapshotString); + } + + private Set getAbortedTransactions() throws IOException { + Set abortedTransactions = new HashSet(); + RevisionManager rm = null; + try { + rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(config); + for (String family : columnFamilies) { + List abortedWriteTransactions = rm.getAbortedWriteTransactions( + hBaseTableName, family); + if (abortedWriteTransactions != null) { + for (FamilyRevision revision : abortedWriteTransactions) { + abortedTransactions.add(revision.getRevision()); + } + } + } + + return abortedTransactions; + } finally { + HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm); + } + } + + public Get applyConstraints(Get get) throws IOException { + get.setTimeRange(0, maxValidRevision + 1); + get.setMaxVersions(); + return get; + } + + public Scan applyConstraints(Scan scan) throws IOException { + scan.setTimeRange(0, maxValidRevision + 1); + scan.setMaxVersions(); + return scan; + } + + public Result filter(Result rawResult) { + + // Find "qualifying" KeyValues and group by column ("family":"qualifier"). + // 1. KeyValues can't be newer than the snapshot allows for its column-family. + // 2. KeyValues shouldn't be in the aborted list either. + // 3. Don't return more than MAX_NUM_REVISIONS KeyValues for each column. + HashMap> column_Vs_KeyValue = new HashMap>(); + for (KeyValue rawKeyValue : rawResult.list()) { + + if (abortedTransactions.contains(rawKeyValue.getTimestamp())) + continue; // Skip this column-value. + + if (rawKeyValue.getTimestamp() <= snapshot.getRevision(Bytes.toString(rawKeyValue.getFamily()))) { + // Current KeyValue falls in snapshot-range. Of interest in final output. + String column = Bytes.toString(rawKeyValue.getFamily()) + + ":" + Bytes.toString(rawKeyValue.getQualifier()); + + List keyValues = column_Vs_KeyValue.containsKey(column)? + column_Vs_KeyValue.get(column) : new ArrayList(MAX_NUM_REVISIONS); + + if (keyValues.size() < MAX_NUM_REVISIONS) { + keyValues.add(rawKeyValue); + column_Vs_KeyValue.put(column, keyValues); + } + } + } + + List finalKeyValues = new ArrayList(); + for (Map.Entry> entry : column_Vs_KeyValue.entrySet()) + finalKeyValues.addAll(entry.getValue()); + + return finalKeyValues.size() == 0? null : new Result(finalKeyValues.toArray(new KeyValue[finalKeyValues.size()])); + } + +}