Index: ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java =================================================================== --- ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java (revision 800178) +++ ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java (working copy) @@ -68,6 +68,16 @@ protected String logFile; protected String clusterMode; + + protected String miniHBase; + + public void setMiniHBase(String miniHBase) { + this.miniHBase = miniHBase; + } + + public String getMiniHBase() { + return miniHBase; + } public void setClusterMode(String clusterMode) { this.clusterMode = clusterMode; @@ -262,6 +272,9 @@ if (clusterMode == null) clusterMode = new String(""); + + if (miniHBase == null) + miniHBase = new String("false"); // For each of the qFiles generate the test VelocityContext ctx = new VelocityContext(); @@ -270,6 +283,7 @@ ctx.put("resultsDir", resultsDir); ctx.put("logDir", logDir); ctx.put("clusterMode", clusterMode); + ctx.put("miniHBase", miniHBase); File outFile = new File(outDir, className + ".java"); FileWriter writer = new FileWriter(outFile); Index: build-common.xml =================================================================== --- build-common.xml (revision 800178) +++ build-common.xml (working copy) @@ -135,6 +135,7 @@ + Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 800178) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; @@ -41,6 +42,7 @@ public class HiveConf extends Configuration { protected String hiveJar; + protected String hbaseJar; protected Properties origProp; protected String auxJars; private static final Log l4j = LogFactory.getLog(HiveConf.class); @@ -75,6 +77,9 @@ HADOOPJT("mapred.job.tracker", "local"), HADOOPNUMREDUCERS("mapred.reduce.tasks", 1), HADOOPJOBNAME("mapred.job.name", null), + + // hbase stuff + HBASEMASTER("hbase.master", "localhost:60000"), // MetaStore stuff. METASTOREDIRECTORY("hive.metastore.metadb.dir", ""), @@ -310,6 +315,16 @@ // preserve the original configuration origProp = getUnderlyingProps(); + // try to add the hbase configuration + URL hbaseconfurl = getClassLoader().getResource("hbase-default.xml"); + if (hbaseconfurl != null) { + addResource(hbaseconfurl); + } + URL hbasesiteurl = getClassLoader().getResource("hbase-site.xml"); + if (hbasesiteurl != null) { + addResource(hbasesiteurl); + } + // let's add the hive configuration URL hconfurl = getClassLoader().getResource("hive-default.xml"); if(hconfurl == null) { @@ -345,6 +360,15 @@ if(auxJars == null) { auxJars = this.get(ConfVars.HIVEAUXJARS.varname); } + + hbaseJar = (new JobConf(HBaseConfiguration.class)).getJar(); + if (hbaseJar != null) { + if (auxJars == null) + auxJars = hbaseJar; + else + auxJars = hbaseJar + "," + auxJars; + this.setAuxJars(auxJars); + } } Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 800178) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy) @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.generated.master.table_jsp; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; @@ -293,21 +294,27 @@ } tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); } - tbl.getSd().setLocation(tblPath.toString()); // get_table checks whether database exists, it should be moved here if(is_table_exists(tbl.getDbName(), tbl.getTableName())) { throw new AlreadyExistsException("Table " + tbl.getTableName() + " already exists"); } - - if(!wh.isDir(tblPath)) { - if(!wh.mkdirs(tblPath)) { - throw new MetaException (tblPath + " is not a directory or unable to create one"); + + // If the hive table is a file table, we create the dir. + if (!(tbl.getSd().getInputFormat() != null && + tbl.getSd().getInputFormat().equals( + "org.apache.hadoop.hive.ql.io.HiveHBaseTableInputFormat"))) { + + if(!wh.isDir(tblPath)) { + if(!wh.mkdirs(tblPath)) { + throw new MetaException (tblPath + " is not a directory or unable to create one"); + } + madeDir = true; } - madeDir = true; + } - + getMS().createTable(tbl); success = getMS().commitTransaction(); Index: ql/build.xml =================================================================== --- ql/build.xml (revision 800178) +++ ql/build.xml (working copy) @@ -76,11 +76,22 @@ queryDirectory="${ql.test.query.clientpositive.dir}" queryFile="${qfile}" clusterMode="${clustermode}" + miniHBase="false" resultsDirectory="${ql.test.results.clientpositive.dir}" className="TestCliDriver" logFile="${test.log.dir}/testclidrivergen.log" logDirectory="${test.log.dir}/clientpositive"/> + + )(HiveSequenceFileOutputFormat.class); - String newFile = hiveScratchDir + File.separator + (++numEmptyPaths); - Path newPath = new Path(newFile); - LOG.info("Changed input file to " + newPath.toString()); - - // toggle the work - LinkedHashMap> pathToAliases = work.getPathToAliases(); - if (isEmptyPath) { - assert path != null; - pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path)); - pathToAliases.remove(path); + if (outFileFormat == HiveHBaseTableOutputFormat.class) { + FileInputFormat.addInputPaths(job, path); + LOG.info("Add a hbase table " + path); + } else { + String newFile = hiveScratchDir + File.separator + (++numEmptyPaths); + Path newPath = new Path(newFile); + LOG.info("Changed input file to " + newPath.toString()); + + // toggle the work + LinkedHashMap> pathToAliases = work.getPathToAliases(); + if (isEmptyPath) { + assert path != null; + pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path)); + pathToAliases.remove(path); + } + else { + assert path == null; + ArrayList newList = new ArrayList(); + newList.add(alias); + pathToAliases.put(newPath.toUri().toString(), newList); + } + + work.setPathToAliases(pathToAliases); + + LinkedHashMap pathToPartitionInfo = work.getPathToPartitionInfo(); + if (isEmptyPath) { + pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path)); + pathToPartitionInfo.remove(path); + } + else { + partitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone(); + Class inputFormat = SequenceFileInputFormat.class; + pDesc.getTableDesc().setInputFileFormatClass(inputFormat); + pathToPartitionInfo.put(newPath.toUri().toString(), pDesc); + } + work.setPathToPartitionInfo(pathToPartitionInfo); + + String onefile = newPath.toString(); + RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newPath, Text.class, false, new Properties(), null); + recWriter.close(false); + FileInputFormat.addInputPaths(job, onefile); } - else { - assert path == null; - ArrayList newList = new ArrayList(); - newList.add(alias); - pathToAliases.put(newPath.toUri().toString(), newList); - } - - work.setPathToAliases(pathToAliases); - - LinkedHashMap pathToPartitionInfo = work.getPathToPartitionInfo(); - if (isEmptyPath) { - pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path)); - pathToPartitionInfo.remove(path); - } - else { - partitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone(); - Class inputFormat = SequenceFileInputFormat.class; - pDesc.getTableDesc().setInputFileFormatClass(inputFormat); - pathToPartitionInfo.put(newPath.toUri().toString(), pDesc); - } - work.setPathToPartitionInfo(pathToPartitionInfo); - - String onefile = newPath.toString(); - RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newPath, Text.class, false, new Properties(), null); - recWriter.close(false); - FileInputFormat.addInputPaths(job, onefile); return numEmptyPaths; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 800178) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveHBaseTableOutputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.fileSinkDesc; @@ -56,7 +57,8 @@ transient protected Serializer serializer; transient protected BytesWritable commonKey = new BytesWritable(); transient protected TableIdEnum tabIdEnum = null; - transient private LongWritable row_count; + transient private LongWritable row_count; + transient private boolean isHBaseTable; public static enum TableIdEnum { TABLE_ID_1_ROWCOUNT, TABLE_ID_2_ROWCOUNT, TABLE_ID_3_ROWCOUNT, TABLE_ID_4_ROWCOUNT, TABLE_ID_5_ROWCOUNT, TABLE_ID_6_ROWCOUNT, TABLE_ID_7_ROWCOUNT, TABLE_ID_8_ROWCOUNT, TABLE_ID_9_ROWCOUNT, TABLE_ID_10_ROWCOUNT, TABLE_ID_11_ROWCOUNT, TABLE_ID_12_ROWCOUNT, TABLE_ID_13_ROWCOUNT, TABLE_ID_14_ROWCOUNT, TABLE_ID_15_ROWCOUNT; @@ -65,10 +67,14 @@ transient protected boolean autoDelete = false; private void commit() throws IOException { - if (!fs.rename(outPath, finalPath)) { - throw new IOException ("Unable to rename output to: " + finalPath); + if(!isHBaseTable) { + if (!fs.rename(outPath, finalPath)) { + throw new IOException ("Unable to rename output to: " + finalPath); + } + LOG.info("Committed to output file: " + finalPath); + } else { + LOG.info("Committed to HBase Table: " + finalPath.getName()); } - LOG.info("Committed to output file: " + finalPath); } protected void initializeOp(Configuration hconf) throws HiveException { @@ -76,6 +82,7 @@ serializer = (Serializer)conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(null, conf.getTableInfo().getProperties()); + isHBaseTable = conf.getTableInfo().getOutputFileFormatClass() == HiveHBaseTableOutputFormat.class; JobConf jc; if(hconf instanceof JobConf) { @@ -97,11 +104,15 @@ Path tmpPath = Utilities.toTempPath(specPath); String taskId = Utilities.getTaskId(hconf); fs =(new Path(specPath)).getFileSystem(hconf); - finalPath = new Path(tmpPath, taskId); - outPath = new Path(tmpPath, Utilities.toTempPath(taskId)); + if (isHBaseTable) { + outPath = finalPath = new Path(specPath); + LOG.info("Writing to HBase table: FS " + outPath.getName()); + } else { + finalPath = new Path(tmpPath, taskId); + outPath = new Path(tmpPath, Utilities.toTempPath(taskId)); + LOG.info("Writing to temp file: FS " + outPath); + } - LOG.info("Writing to temp file: FS " + outPath); - HiveOutputFormat hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); final Class outputClass = serializer.getSerializedClass(); boolean isCompressed = conf.getCompressed(); @@ -116,13 +127,15 @@ outWriter = getRecordWriter(jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath); - // in recent hadoop versions, use deleteOnExit to clean tmp files. - try { - Method deleteOnExit = FileSystem.class.getDeclaredMethod("deleteOnExit", new Class [] {Path.class}); - deleteOnExit.setAccessible(true); - deleteOnExit.invoke(fs, outPath); - autoDelete = true; - } catch (Exception e) {} + if (!isHBaseTable) { + // in recent hadoop versions, use deleteOnExit to clean tmp files. + try { + Method deleteOnExit = FileSystem.class.getDeclaredMethod("deleteOnExit", new Class [] {Path.class}); + deleteOnExit.setAccessible(true); + deleteOnExit.invoke(fs, outPath); + autoDelete = true; + } catch (Exception e) {} + } initializeChildren(hconf); } catch (HiveException e) { @@ -180,7 +193,7 @@ // Hadoop always call close() even if an Exception was thrown in map() or reduce(). try { outWriter.close(abort); - if(!autoDelete) + if(!autoDelete && !isHBaseTable) fs.delete(outPath, true); } catch (Exception e) { e.printStackTrace(); @@ -198,7 +211,7 @@ @Override public void jobClose(Configuration hconf, boolean success) throws HiveException { try { - if(conf != null) { + if(conf != null && !isHBaseTable) { String specPath = conf.getDirName(); fs = (new Path(specPath)).getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableInputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableInputFormat.java (revision 0) @@ -0,0 +1,43 @@ +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +public class HiveHBaseTableInputFormat + implements InputFormat, JobConfigurable { + + org.apache.hadoop.hbase.mapred.TableInputFormat mInputFormat; + + public HiveHBaseTableInputFormat() { + mInputFormat = new org.apache.hadoop.hbase.mapred.TableInputFormat(); + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + return (RecordReader) mInputFormat.getRecordReader(split, job, reporter); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return mInputFormat.getSplits(job, numSplits); + } + + @Override + public void configure(JobConf job) { + mInputFormat.configure(job); + } + + public void validateInput(JobConf job) throws IOException { + mInputFormat.validateInput(job); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableOutputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableOutputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableOutputFormat.java (revision 0) @@ -0,0 +1,63 @@ +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.Progressable; + +public class HiveHBaseTableOutputFormat extends + TableOutputFormat implements + HiveOutputFormat { + + ImmutableBytesWritable KEY = new ImmutableBytesWritable(); + + /** + * update to the final out table, and output an empty key as the key + * + * @param jc + * the job configuration file + * @param finalOutPath + * the final output table name + * @param valueClass + * the value class used for create + * @param isCompressed + * whether the content is compressed or not + * @param tableProperties + * the tableInfo of this file's corresponding table + * @param progress + * progress used for status report + * @return the RecordWriter for the output file + */ + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, + Properties tableProperties, Progressable progress) throws IOException { + jc.set(TableOutputFormat.OUTPUT_TABLE, finalOutPath.getName()); + + final org.apache.hadoop.mapred.RecordWriter tblWriter = + this.getRecordWriter(null, jc, null, progress); + return new RecordWriter() { + + @Override + public void close(boolean abort) throws IOException { + tblWriter.close(null); + } + + @Override + public void write(Writable w) throws IOException { + BatchUpdate bu = (BatchUpdate)w; + KEY.set(bu.getRow()); + tblWriter.write(KEY, bu); + } + + }; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 800178) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy) @@ -36,12 +36,16 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.mapred.TableInputFormat; +import org.apache.hadoop.hbase.mapred.TableSplit; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.mapredWork; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.ql.plan.partitionDesc; +import org.apache.hadoop.hive.serde2.hbase.HBaseSerDe; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; @@ -97,6 +101,8 @@ public Path getPath() { if (inputSplit instanceof FileSplit) { return ((FileSplit)inputSplit).getPath(); + } else if (inputSplit instanceof TableSplit) { + return new Path("/" + Bytes.toString(((TableSplit)inputSplit).getTableName())); } return new Path(""); } @@ -179,6 +185,7 @@ (InputFormat)ReflectionUtils.newInstance(inputFormatClass, job); inputFormats.put(inputFormatClass, newInstance); } catch (Exception e) { + e.printStackTrace(); throw new IOException("Cannot create an instance of InputFormat class " + inputFormatClass.getName() + " as specified in mapredWork!"); } @@ -258,14 +265,21 @@ // for each dir, get the InputFormat, and do getSplits. for(Path dir: dirs) { + FileInputFormat.setInputPaths(newjob, dir); tableDesc table = getTableDescFromPath(dir); + if (table.getSerdeClassName() != null && + table.getSerdeClassName().equals(HBaseSerDe.class.getName())) { + String hbaseColumns = (String) table.getProperties().get("hbase.columns.mapping"); + hbaseColumns.replace(',', ' '); + newjob.set(TableInputFormat.COLUMN_LIST, hbaseColumns); + } + // create a new InputFormat instance if this is the first time to see this class Class inputFormatClass = table.getInputFileFormatClass(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - - FileInputFormat.setInputPaths(newjob, dir); + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, newjob); + newjob.setInputFormat(inputFormat.getClass()); - InputSplit[] iss = inputFormat.getSplits(newjob, numSplits/dirs.length); + InputSplit[] iss = inputFormat.getSplits(newjob, (numSplits/dirs.length == 0) ? 1 : numSplits/dirs.length); for(InputSplit is: iss) { result.add(new HiveInputSplit(is, inputFormatClass.getName())); } Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 800178) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -20,18 +20,30 @@ import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.derby.iapi.sql.dictionary.TableDescriptor; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -249,6 +261,8 @@ tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getName(), tbl.getDeserializer())); } tbl.checkValidity(); + if (tbl.getInputFormatClass() == org.apache.hadoop.hive.ql.io.HiveHBaseTableInputFormat.class) + validateAndCreateHBaseTableIfNeeded(tbl); getMSC().createTable(tbl.getTTable()); } catch (AlreadyExistsException e) { if (!ifNotExists) { @@ -258,7 +272,63 @@ throw new HiveException(e); } } + + private void validateAndCreateHBaseTableIfNeeded(Table tbl) + throws URISyntaxException, HiveException, IOException { + String location = tbl.getTTable().getSd().getLocation(); + if (location == null) { // not specify a hbase location, try to generate one hbase:///tablename + location = tbl.getTTable().getTableName(); + } + // make the location absolute + tbl.getTTable().getSd().setLocation("/"+location); + + // Build the mapping schema + Set columnFamilies = new HashSet(); + // Check the hbase columns and get all the families + String hbaseColumnStr = tbl.getSerdeParam("hbase.columns.mapping"); + if (hbaseColumnStr == null) + throw new HiveException("No schema mapping defined in Serde."); + String[] hbaseColumns = hbaseColumnStr.split(","); + for(String hbaseColumn : hbaseColumns) { + int idx = hbaseColumn.indexOf(":"); + if (idx < 0) + throw new HiveException(hbaseColumn + " is not a qualified hbase column."); + columnFamilies.add(hbaseColumn.substring(0, idx + 1)); + } + // Check if the given hbase table existes + HBaseConfiguration hbaseConf = new HBaseConfiguration(conf); + HBaseAdmin admin = new HBaseAdmin(hbaseConf); + HTableDescriptor tblDesc; + + if (!admin.tableExists(location)) { + + String externalStr = tbl.getProperty("EXTERNAL"); + // it is not a external table create one + if (externalStr == null || !externalStr.equals("TRUE")) { + // Create the all column descriptors + tblDesc = new HTableDescriptor(location); + for (String cf : columnFamilies) { + tblDesc.addFamily(new HColumnDescriptor(cf)); + } + + admin.createTable(tblDesc); + } else { // an external table + throw new HiveException("HBase table " + location + + " doesn't exist while the table is declared as an external table."); + } + + } else { // make sure the schema mapping is right + tblDesc = admin.getTableDescriptor(location); + for (String cf : columnFamilies) { + if(!tblDesc.hasFamily(Bytes.toBytes(cf))) + throw new HiveException("Column Family " + cf + " is not defined in hbase table " + location); + } + } + // ensure the table is online + new HTable(hbaseConf, tblDesc.getName()); + } + /** * Drops table along with the data in it. If the table doesn't exist * then it is a no-op Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 800178) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy) @@ -37,11 +37,12 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.MsckDesc; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveHBaseTableInputFormat; +import org.apache.hadoop.hive.ql.io.HiveHBaseTableOutputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; @@ -61,7 +62,6 @@ import org.apache.hadoop.hive.ql.plan.fetchWork; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.hive.ql.exec.Task; @@ -91,6 +91,8 @@ private static final String SEQUENCEFILE_OUTPUT = SequenceFileOutputFormat.class.getName(); private static final String RCFILE_INPUT = RCFileInputFormat.class.getName(); private static final String RCFILE_OUTPUT = RCFileOutputFormat.class.getName(); + private static final String HBASETABLE_INPUT = HiveHBaseTableInputFormat.class.getName(); + private static final String HBASETABLE_OUTPUT = HiveHBaseTableOutputFormat.class.getName(); private static final String COLUMNAR_SERDE = ColumnarSerDe.class.getName(); @@ -263,6 +265,10 @@ outputFormat = RCFILE_OUTPUT; serde = COLUMNAR_SERDE; break; + case HiveParser.TOK_TBLHBASETABLE: + inputFormat = HBASETABLE_INPUT; + outputFormat = HBASETABLE_OUTPUT; + break; case HiveParser.TOK_TABLEFILEFORMAT: inputFormat = unescapeSQLString(child.getChild(0).getText()); outputFormat = unescapeSQLString(child.getChild(1).getText()); @@ -273,6 +279,12 @@ default: assert false; } } + + if (inputFormat == HBASETABLE_INPUT && serde == null) { + throw new SemanticException("Should specify the serde during creating a hbased-hive table!" + + "Because you need to specify the column mapping from hbase table to hive table."); + } + if (likeTableName == null) { createTableDesc crtTblDesc = new createTableDesc(tableName, isExt, cols, partCols, bucketCols, Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 800178) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -108,6 +108,7 @@ TOK_TBLSEQUENCEFILE; TOK_TBLTEXTFILE; TOK_TBLRCFILE; +TOK_TBLHBASETABLE; TOK_TABLEFILEFORMAT; TOK_TABCOLNAME; TOK_TABLELOCATION; @@ -429,6 +430,7 @@ KW_STORED KW_AS KW_SEQUENCEFILE -> TOK_TBLSEQUENCEFILE | KW_STORED KW_AS KW_TEXTFILE -> TOK_TBLTEXTFILE | KW_STORED KW_AS KW_RCFILE -> TOK_TBLRCFILE + | KW_STORED KW_AS KW_HBASETABLE -> TOK_TBLHBASETABLE | KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral -> ^(TOK_TABLEFILEFORMAT $inFmt $outFmt) ; @@ -1210,6 +1212,7 @@ KW_SEQUENCEFILE: 'SEQUENCEFILE'; KW_TEXTFILE: 'TEXTFILE'; KW_RCFILE: 'RCFILE'; +KW_HBASETABLE: 'HBASETABLE'; KW_INPUTFORMAT: 'INPUTFORMAT'; KW_OUTPUTFORMAT: 'OUTPUTFORMAT'; KW_LOCATION: 'LOCATION'; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 800178) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveHBaseTableOutputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -2423,20 +2424,29 @@ if(parts != null && parts.size() > 0) { throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg()); } + + // if the destination table is a hbased hive table + boolean isHBaseTable = dest_tab.getOutputFormatClass() == HiveHBaseTableOutputFormat.class; + dest_path = dest_tab.getPath(); - queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri()); + if (isHBaseTable) + queryTmpdir = dest_path.toUri().toString(); + else + queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri()); table_desc = Utilities.getTableDesc(dest_tab); this.idToTableNameMap.put( String.valueOf(this.destTableId), dest_tab.getName()); currentTableId = this.destTableId; this.destTableId ++; - // Create the work for moving the table - this.loadTableWork.add - (new loadTableDesc(queryTmpdir, - ctx.getExternalTmpFileURI(dest_path.toUri()), - table_desc, - new HashMap())); + if (!isHBaseTable) { + // Create the work for moving the table + this.loadTableWork.add + (new loadTableDesc(queryTmpdir, + ctx.getExternalTmpFileURI(dest_path.toUri()), + table_desc, + new HashMap())); + } outputs.add(new WriteEntity(dest_tab)); break; } Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 800178) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -26,10 +26,12 @@ import junit.framework.TestCase; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; @@ -48,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.reduceSinkDesc; import org.apache.hadoop.hive.ql.plan.scriptDesc; import org.apache.hadoop.hive.ql.plan.selectDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.TextInputFormat; @@ -466,7 +469,30 @@ private void executePlan(File planFile) throws Exception { String testName = new Exception().getStackTrace()[1].getMethodName(); - String cmdLine = conf.getVar(HiveConf.ConfVars.HADOOPBIN) + " jar " + conf.getJar() + + + String libJarsOption; + { + String addedJars = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.JAR); + conf.setVar(ConfVars.HIVEADDEDJARS, addedJars); + + String auxJars = conf.getAuxJars(); + // Put auxjars and addedjars together into libjars + if (StringUtils.isEmpty(addedJars)) { + if (StringUtils.isEmpty(auxJars)) { + libJarsOption = " "; + } else { + libJarsOption = " -libjars " + auxJars + " "; + } + } else { + if (StringUtils.isEmpty(auxJars)) { + libJarsOption = " -libjars " + addedJars + " "; + } else { + libJarsOption = " -libjars " + addedJars + "," + auxJars + " "; + } + } + } + + String cmdLine = conf.getVar(HiveConf.ConfVars.HADOOPBIN) + " jar " + libJarsOption + conf.getJar() + " org.apache.hadoop.hive.ql.exec.ExecDriver -plan " + planFile.toString() + " " + ExecDriver.generateCmdLine(conf); System.out.println("Executing: " + cmdLine); Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 800178) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -24,6 +24,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileWriter; +import java.io.IOException; import java.io.PrintStream; import java.io.Serializable; import java.net.URI; @@ -37,13 +38,22 @@ import java.lang.reflect.Method; import java.lang.reflect.Constructor; +import org.apache.derby.iapi.sql.dictionary.TableDescriptor; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hive.cli.CliDriver; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -90,6 +100,8 @@ private Object dfs = null; private boolean miniMr = false; private Class dfsClass = null; + private MiniHBaseCluster hbase = null; + private final int NUM_REGIONSERVERS = 1; public boolean deleteDirectory(File path) { if (path.exists()) { @@ -164,10 +176,10 @@ } public QTestUtil(String outDir, String logDir) throws Exception { - this(outDir, logDir, false); + this(outDir, logDir, false, false); } - public QTestUtil(String outDir, String logDir, boolean miniMr) throws Exception { + public QTestUtil(String outDir, String logDir, boolean miniMr, boolean miniHBase) throws Exception { this.outDir = outDir; this.logDir = logDir; conf = new HiveConf(Driver.class); @@ -207,7 +219,25 @@ conf.set("hive.metastore.warehouse.dir", fsName.concat("/build/ql/test/data/warehouse/")); conf.set("mapred.job.tracker", "localhost:" + mr.getJobTrackerPort()); - } + } + + if (miniHBase) { + // Setup the hbase Cluster + try { + conf.setVar(ConfVars.HBASEMASTER, "local"); + HBaseConfiguration hbaseConf = new HBaseConfiguration(conf); + hbase = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS); + conf.setVar(ConfVars.HBASEMASTER, hbase.getHMasterAddress().toString()); + // opening the META table ensures that cluster is running + new HTable(new HBaseConfiguration(conf), HConstants.META_TABLE_NAME); + } catch (IOException ie) { + ie.printStackTrace(); + if (hbase != null) { + hbase.shutdown(); + } + throw ie; + } + } // System.out.println(conf.toString()); testFiles = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); @@ -225,6 +255,12 @@ public void shutdown() throws Exception { cleanUp(); + if (hbase != null) { + HConnectionManager.deleteConnectionInfo(new HBaseConfiguration(conf), true); + hbase.shutdown(); + hbase = null; + } + if (dfs != null) { Method m = dfsClass.getDeclaredMethod("shutdown", new Class[]{}); m.invoke(dfs, new Object[]{}); Index: ql/src/test/queries/clienthbase/hbase_queries.q =================================================================== --- ql/src/test/queries/clienthbase/hbase_queries.q (revision 0) +++ ql/src/test/queries/clienthbase/hbase_queries.q (revision 0) @@ -0,0 +1,64 @@ +DROP TABLE hbase_table_1; +CREATE TABLE hbase_table_1(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE; + +EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *; +FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *; + +DROP TABLE hbase_table_2; +CREATE EXTERNAL TABLE hbase_table_2(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE LOCATION "hbase_table_1"; + +EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20; + +FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20; + +EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.*; + +FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.*; + +DROP TABLE empty_hbase_table; +CREATE TABLE empty_hbase_table(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE; + +DROP TABLE empty_normal_table; +CREATE TABLE empty_normal_table(key int, value string); + +select * from (select count(1) from empty_normal_table union all select count(1) from empty_hbase_table) x; +select * from (select count(1) from empty_normal_table union all select count(1) from hbase_table_1) x; +select * from (select count(1) from src union all select count(1) from empty_hbase_table) x; +select * from (select count(1) from src union all select count(1) from hbase_table_1) x; + +DROP TABLE hbase_table_1; +DROP TABLE hbase_table_2; +DROP TABLE empty_hbase_table; +DROP TABLE empty_normal_table; \ No newline at end of file Index: ql/src/test/results/clienthbase/hbase_queries.q.out =================================================================== --- ql/src/test/results/clienthbase/hbase_queries.q.out (revision 0) +++ ql/src/test/results/clienthbase/hbase_queries.q.out (revision 0) @@ -0,0 +1,313 @@ +query: DROP TABLE hbase_table_1 +query: CREATE TABLE hbase_table_1(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE +query: EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT * +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB hbase_table_1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Map Reduce + Alias -> Map Operator Tree: + src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.hive.ql.io.HiveHBaseTableInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveHBaseTableOutputFormat + serde: org.apache.hadoop.hive.serde2.hbase.HBaseSerDe + name: hbase_table_1 + + +query: FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT * +Input: default/src +Output: default/hbase_table_1 +query: DROP TABLE hbase_table_2 +query: CREATE EXTERNAL TABLE hbase_table_2(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE LOCATION "hbase_table_1" +query: EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20 +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_1))))) x) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF src))))) Y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL Y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF Y))) (TOK_LIMIT 20))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + x:hbase_table_1 + Select Operator + expressions: + expr: key + type: int + outputColumnNames: _col0 + Reduce Output Operator + key expressions: + expr: UDFToDouble(_col0) + type: double + sort order: + + Map-reduce partition columns: + expr: UDFToDouble(_col0) + type: double + tag: 0 + y:src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: UDFToDouble(_col0) + type: double + sort order: + + Map-reduce partition columns: + expr: UDFToDouble(_col0) + type: double + tag: 1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {VALUE._col0} {VALUE._col1} + outputColumnNames: _col2, _col3 + Select Operator + expressions: + expr: _col2 + type: string + expr: _col3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 20 + + +query: FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20 +Input: default/hbase_table_1 +Input: default/src +Output: file:/home/lmsp/projects/hive/build/ql/tmp/268562938/10000 +0 val_0 +0 val_0 +0 val_0 +2 val_2 +4 val_4 +5 val_5 +5 val_5 +5 val_5 +8 val_8 +9 val_9 +10 val_10 +11 val_11 +12 val_12 +12 val_12 +15 val_15 +15 val_15 +17 val_17 +18 val_18 +18 val_18 +19 val_19 +query: EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.* +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_1))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL hbase_table_1) key) 100)))) x) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_2)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_2))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL hbase_table_2) key) 120)))) Y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL Y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF Y))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + y:hbase_table_2 + Filter Operator + predicate: + expr: (key < 120) + type: boolean + Filter Operator + predicate: + expr: (key < 120) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + Map-reduce partition columns: + expr: _col0 + type: int + tag: 1 + value expressions: + expr: _col0 + type: int + expr: _col1 + type: string + x:hbase_table_1 + Filter Operator + predicate: + expr: (key > 100) + type: boolean + Filter Operator + predicate: + expr: (key > 100) + type: boolean + Select Operator + expressions: + expr: key + type: int + outputColumnNames: _col0 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + Map-reduce partition columns: + expr: _col0 + type: int + tag: 0 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {VALUE._col0} {VALUE._col1} + outputColumnNames: _col2, _col3 + Select Operator + expressions: + expr: _col2 + type: int + expr: _col3 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +query: FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.* +Input: default/hbase_table_2 +Input: default/hbase_table_1 +Output: file:/home/lmsp/projects/hive/build/ql/tmp/394656156/10000 +103 val_103 +104 val_104 +105 val_105 +111 val_111 +113 val_113 +114 val_114 +116 val_116 +118 val_118 +119 val_119 +query: DROP TABLE empty_hbase_table +query: CREATE TABLE empty_hbase_table(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE +query: DROP TABLE empty_normal_table +query: CREATE TABLE empty_normal_table(key int, value string) +query: select * from (select count(1) from empty_normal_table union all select count(1) from empty_hbase_table) x +Input: default/empty_hbase_table +Input: default/empty_normal_table +Output: file:/home/lmsp/projects/hive/build/ql/tmp/1480628629/10000 +0 +0 +query: select * from (select count(1) from empty_normal_table union all select count(1) from hbase_table_1) x +Input: default/empty_normal_table +Input: default/hbase_table_1 +Output: file:/home/lmsp/projects/hive/build/ql/tmp/1722384298/10000 +309 +0 +query: select * from (select count(1) from src union all select count(1) from empty_hbase_table) x +Input: default/empty_hbase_table +Input: default/src +Output: file:/home/lmsp/projects/hive/build/ql/tmp/1091258489/10000 +500 +0 +query: select * from (select count(1) from src union all select count(1) from hbase_table_1) x +Input: default/src +Input: default/hbase_table_1 +Output: file:/home/lmsp/projects/hive/build/ql/tmp/1515840724/10000 +309 +500 +query: DROP TABLE hbase_table_1 +query: DROP TABLE hbase_table_2 +query: DROP TABLE empty_hbase_table +query: DROP TABLE empty_normal_table Index: ql/src/test/templates/TestCliDriver.vm =================================================================== --- ql/src/test/templates/TestCliDriver.vm (revision 800178) +++ ql/src/test/templates/TestCliDriver.vm (working copy) @@ -36,7 +36,10 @@ if ("$clusterMode".equals("miniMR")) miniMR = true; - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", miniMR); + // as we need to start a miniHBaseCluster for hbase queries + // seperate the hbase queries from other normal hive queries, + // so we can just start the miniHBaseCluster during hbase querying. + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", miniMR, $miniHBase); #foreach ($qf in $qfiles) qt.addFile("$qf.getCanonicalPath()"); Index: serde/src/java/org/apache/hadoop/hive/serde2/hbase/HBaseSerDe.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/hbase/HBaseSerDe.java (revision 0) +++ serde/src/java/org/apache/hadoop/hive/serde2/hbase/HBaseSerDe.java (revision 0) @@ -0,0 +1,456 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyHBaseRow; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * HBaseSerDe can be used to serialize object into hbase table and + * deserialize object from hbase table. + */ +public class HBaseSerDe implements SerDe { + + public static final Log LOG = LogFactory.getLog( + HBaseSerDe.class.getName()); + + public static class HBaseSerDeParameters { + List hbaseColumnNames; + SerDeParameters serdeParams; + + public List getHBaseColumnNames() { + return hbaseColumnNames; + } + + public SerDeParameters getSerDeParameters() { + return serdeParams; + } + } + + private ObjectInspector mCachedObjectInspector; + HBaseSerDeParameters mHBaseSerDeParameters = null; + private boolean useJSONSerialize; // use json to serialize + + public String toString() { + return getClass().toString() + + "[" + + mHBaseSerDeParameters.hbaseColumnNames + + ":" + + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo()) + .getAllStructFieldNames() + + ":" + + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo()) + .getAllStructFieldTypeInfos() + "]"; + } + + public HBaseSerDe() throws SerDeException { + } + + /** + * Initialize the SerDe given parameters. + * @see SerDe#initialize(Configuration, Properties) + */ + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + mHBaseSerDeParameters = HBaseSerDe.initHBaseSerDeParameters(conf, tbl, + getClass().getName()); + + // We just used columnNames & columnTypes these two parameters + mCachedObjectInspector = LazyFactory.createLazyStructInspector( + mHBaseSerDeParameters.serdeParams.getColumnNames(), + mHBaseSerDeParameters.serdeParams.getColumnTypes(), + mHBaseSerDeParameters.serdeParams.getSeparators(), + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isLastColumnTakesRest(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar()); + + mCachedHBaseRow = new LazyHBaseRow((LazySimpleStructObjectInspector)mCachedObjectInspector); + + if (LOG.isDebugEnabled()) { + LOG.debug("HBaseSerDe initialized with : columnNames = " + + mHBaseSerDeParameters.serdeParams.getColumnNames() + " columnTypes = " + + mHBaseSerDeParameters.serdeParams.getColumnTypes() + " hbaseColumnMapping = " + + mHBaseSerDeParameters.hbaseColumnNames); + } + } + + public static HBaseSerDeParameters initHBaseSerDeParameters( + Configuration job, Properties tbl, String serdeName) + throws SerDeException { + HBaseSerDeParameters serdeParams = new HBaseSerDeParameters(); + + // Read Configuration Parameter + String hbaseColumnNameProperty = tbl.getProperty("hbase.columns.mapping"); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + // Initial the hbase column list + if (hbaseColumnNameProperty != null && hbaseColumnNameProperty.length() > 0) { + serdeParams.hbaseColumnNames = Arrays.asList(hbaseColumnNameProperty.split(",")); + } else { + serdeParams.hbaseColumnNames = new ArrayList(); + } + + // Add the hbase key to the columnNameList and columnTypeList + + // Build the type property string + if (columnTypeProperty == null) { + StringBuilder sb = new StringBuilder(); + sb.append(Constants.STRING_TYPE_NAME); + + for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) { + String colName = serdeParams.hbaseColumnNames.get(i); + if(colName.endsWith(":")) + sb.append(":").append(Constants.MAP_TYPE_NAME + "<" + + Constants.STRING_TYPE_NAME + "," + Constants.STRING_TYPE_NAME + ">"); + else + sb.append(":").append(Constants.STRING_TYPE_NAME); + } + tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString()); + } + + serdeParams.serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); + + if (serdeParams.hbaseColumnNames.size() + 1 != serdeParams.serdeParams.getColumnNames().size()) { + throw new SerDeException(serdeName + ": columns has " + + serdeParams.serdeParams.getColumnNames().size() + + " elements while hbase.columns.mapping has " + + serdeParams.hbaseColumnNames.size() + " elements!"); + } + + // check the mapping schema is right? + // we just can make sure that "columnfamily:" is mapped to MAP + for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) { + String hbaseColName = serdeParams.hbaseColumnNames.get(i); + if(hbaseColName.endsWith(":")) { + TypeInfo typeInfo = serdeParams.serdeParams.getColumnTypes().get(i+1); + if(typeInfo.getCategory() == Category.MAP && + ((MapTypeInfo)typeInfo).getMapKeyTypeInfo().getTypeName() != Constants.STRING_TYPE_NAME) { + throw new SerDeException(serdeName + ": hbase column family '" + + hbaseColName + "' should be mapped to Map while being mapped to " + + ((MapTypeInfo)typeInfo).getMapKeyTypeInfo().getTypeName()); + } + } + } + + return serdeParams; + } + + // The object for storing hbase row data. + LazyHBaseRow mCachedHBaseRow; + + /** + * Deserialize a row from the HBase RowResult writable to a LazyObject + * @param rowResult the HBase RowResult Writable contain a row + * @return the deserialized object + * @see SerDe#deserialize(Writable) + */ + public Object deserialize(Writable rowResult) throws SerDeException { + + if (!(rowResult instanceof RowResult)) { + throw new SerDeException(getClass().getName() + ": expects RowResult!"); + } + + RowResult rr = (RowResult)rowResult; + mCachedHBaseRow.init(rr, mHBaseSerDeParameters.hbaseColumnNames); + return mCachedHBaseRow; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return mCachedObjectInspector; + } + + BatchUpdate serializeCache = null; + ByteStream.Output serializeStream = new ByteStream.Output(); + + @Override + public Class getSerializedClass() { + return BatchUpdate.class; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) + throws SerDeException { + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector)objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + List declaredFields = + (mHBaseSerDeParameters.serdeParams.getRowTypeInfo() != null && + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo()).getAllStructFieldNames().size()>0)? + ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs(): null; + + boolean isNotNull = false; + String hbaseColumn = ""; + + try { + // Serialize each field + for (int i=0; i= declaredFields.size()) { + throw new SerDeException( + "Error: expecting " + declaredFields.size() + + " but asking for field " + i + "\n" + "data=" + obj + "\n" + + "tableType=" + mHBaseSerDeParameters.serdeParams.getRowTypeInfo().toString() + "\n" + + "dataType=" + + TypeInfoUtils.getTypeInfoFromObjectInspector(objInspector)); + } + + if (f == null) // a null object, we do not serialize it + continue; + + if (i > 0) + hbaseColumn = mHBaseSerDeParameters.hbaseColumnNames.get(i-1); + + // If the field that is column family in hbase + if(i > 0 && hbaseColumn.endsWith(":")) { + MapObjectInspector moi = (MapObjectInspector)foi; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + continue; + } else { + for (Map.Entry entry: map.entrySet()) { + // Get the Key + serialize(serializeStream, entry.getKey(), koi, + mHBaseSerDeParameters.serdeParams.getSeparators(), 3, + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar(), + mHBaseSerDeParameters.serdeParams.getNeedsEscape()); + + // generate a column name (column_family:column_name) + hbaseColumn += Bytes.toString(serializeStream.getData()); + + // Get the Value + serializeStream.reset(); + + isNotNull = serialize(serializeStream, entry.getValue(), voi, + mHBaseSerDeParameters.serdeParams.getSeparators(), 3, + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar(), + mHBaseSerDeParameters.serdeParams.getNeedsEscape()); + } + } + } else { + // If the field that is passed in is NOT a primitive, and either the + // field is not declared (no schema was given at initialization), or + // the field is declared as a primitive in initialization, serialize + // the data to JSON string. Otherwise serialize the data in the + // delimited way. + if (!foi.getCategory().equals(Category.PRIMITIVE) + && (declaredFields == null || + declaredFields.get(i).getFieldObjectInspector().getCategory() + .equals(Category.PRIMITIVE) || useJSONSerialize)) { + isNotNull = serialize(serializeStream, SerDeUtils.getJSONString(f, foi), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + mHBaseSerDeParameters.serdeParams.getSeparators(), 1, + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar(), + mHBaseSerDeParameters.serdeParams.getNeedsEscape()); + } else { + isNotNull = serialize(serializeStream, f, foi, + mHBaseSerDeParameters.serdeParams.getSeparators(), 1, + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar(), + mHBaseSerDeParameters.serdeParams.getNeedsEscape()); + } + } + + byte[] key = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount()); + if (i==0) { // the first column is the hbase key + serializeCache = new BatchUpdate(key); + } else { + if(isNotNull) + serializeCache.put(hbaseColumn, key); + } + } + } catch (IOException e) { + throw new SerDeException(e); + } + + return serializeCache; + } + + /** + * Serialize the row into the StringBuilder. + * @param out The StringBuilder to store the serialized data. + * @param obj The object for the current field. + * @param objInspector The ObjectInspector for the current Object. + * @param separators The separators array. + * @param level The current level of separator. + * @param nullSequence The byte sequence representing the NULL value. + * @param escaped Whether we need to escape the data when writing out + * @param escapeChar Which char to use as the escape char, e.g. '\\' + * @param needsEscape Which chars needs to be escaped. This array should have size of 128. + * Negative byte values (or byte values >= 128) are never escaped. + * @throws IOException + * @return true, if serialize a not-null object; otherwise false. + * + * Note: Copy From LazySimpleSerDe. There is a little difference, that we do not serialize a null + * object to hbase. + */ + public static boolean serialize(ByteStream.Output out, Object obj, + ObjectInspector objInspector, byte[] separators, int level, + Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape) throws IOException { + + switch (objInspector.getCategory()) { + case PRIMITIVE: { + LazyUtils.writePrimitiveUTF8(out, obj, (PrimitiveObjectInspector)objInspector, escaped, escapeChar, needsEscape); + return true; + } + case LIST: { + char separator = (char)separators[level]; + ListObjectInspector loi = (ListObjectInspector)objInspector; + List list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + return false; + } else { + for (int i=0; i0) { + out.write(separator); + } + serialize(out, list.get(i), eoi, separators, level+1, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + case MAP: { + char separator = (char)separators[level]; + char keyValueSeparator = (char)separators[level+1]; + MapObjectInspector moi = (MapObjectInspector)objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + return false; + } else { + boolean first = true; + for (Map.Entry entry: map.entrySet()) { + if (first) { + first = false; + } else { + out.write(separator); + } + serialize(out, entry.getKey(), koi, separators, level+2, + nullSequence, escaped, escapeChar, needsEscape); + out.write(keyValueSeparator); + serialize(out, entry.getValue(), voi, separators, level+2, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + case STRUCT: { + char separator = (char)separators[level]; + StructObjectInspector soi = (StructObjectInspector)objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + return false; + } else { + for (int i=0; i0) { + out.write(separator); + } + serialize(out, list.get(i), + fields.get(i).getFieldObjectInspector(), separators, level+1, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + } + + throw new RuntimeException("Unknown category type: " + + objInspector.getCategory()); + } + + + /** + * @return the useJSONSerialize + */ + public boolean isUseJSONSerialize() { + return useJSONSerialize; + } + + /** + * @param useJSONSerialize the useJSONSerialize to set + */ + public void setUseJSONSerialize(boolean useJSONSerialize) { + this.useJSONSerialize = useJSONSerialize; + } + +} Index: serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyHBaseCellMap.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyHBaseCellMap.java (revision 0) +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyHBaseCellMap.java (revision 0) @@ -0,0 +1,119 @@ +package org.apache.hadoop.hive.serde2.lazy; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; + +public class LazyHBaseCellMap extends LazyMap { + + RowResult rowResult; + String hbaseColumnFamily; + + /** + * Construct a LazyCellMap object with the ObjectInspector. + * @param oi + */ + public LazyHBaseCellMap(LazyMapObjectInspector oi) { + super(oi); + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + // do nothing + } + + public void init(RowResult rr, String columnFamily) { + rowResult = rr; + hbaseColumnFamily = columnFamily; + parsed = false; + } + + private void parse() { + if(cachedMap == null) { + cachedMap = new LinkedHashMap(); + } else { + cachedMap.clear(); + } + + Iterator iter = rowResult.keySet().iterator(); + + byte[] columnFamily = hbaseColumnFamily.getBytes(); + while(iter.hasNext()) { + byte[] columnKey = iter.next(); + if(columnFamily.length > columnKey.length) + continue; + + if(0 == LazyUtils.compare(columnFamily, 0, columnFamily.length, + columnKey, 0, columnFamily.length)) { + byte[] columnValue = rowResult.get(columnKey).getValue(); + if(columnValue == null || columnValue.length == 0) // a empty object + continue; + + // Keys are always primitive + LazyPrimitive key = LazyFactory.createLazyPrimitiveClass( + (PrimitiveObjectInspector)((MapObjectInspector)oi).getMapKeyObjectInspector()); + ByteArrayRef keyRef = new ByteArrayRef(); + keyRef.setData(columnKey); + key.init(keyRef, columnFamily.length, columnKey.length - columnFamily.length); + + // Value + LazyObject value = LazyFactory.createLazyObject( + ((MapObjectInspector)oi).getMapValueObjectInspector()); + ByteArrayRef valueRef = new ByteArrayRef(); + valueRef.setData(columnValue); + value.init(valueRef, 0, columnValue.length); + + // Put it onto the map + cachedMap.put(key.getObject(), value.getObject()); + } + } + } + + /** + * Get the value in the map for the given key. + * + * + * + * @param key + * @return + */ + public Object getMapValueElement(Object key) { + if (!parsed) { + parse(); + } + + for(Map.Entry entry : cachedMap.entrySet()) { + LazyPrimitive lazyKeyI = (LazyPrimitive)entry.getKey(); + // getWritableObject() will convert LazyPrimitive to actual primitive writable objects. + Object keyI = lazyKeyI.getWritableObject(); + if (keyI == null) continue; + if (keyI.equals(key)) { + // Got a match, return the value + LazyObject v = (LazyObject)entry.getValue(); + return v == null ? v : v.getObject(); + } + } + + return null; + } + + public Map getMap() { + if (!parsed) { + parse(); + } + return cachedMap; + } + + public int getMapSize() { + if (!parsed) { + parse(); + } + return cachedMap.size(); + } + +} Index: serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyHBaseRow.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyHBaseRow.java (revision 0) +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyHBaseRow.java (revision 0) @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.lazy; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +/** + * LazyObject for storing a hbase row. + * The field of a hbase row can be primitive or non-primitive. + */ +public class LazyHBaseRow extends LazyStruct { + + /** + * The hbase columns mapping of the hbase row. + */ + List hbaseColumns; + RowResult mRowResult; + + /** + * Construct a LazyHBaseRow object with the ObjectInspector. + */ + public LazyHBaseRow(LazySimpleStructObjectInspector oi) { + super(oi); + } + + /** + * Set the hbase row data(a RowResult writable) for this LazyStruct. + * @see LazyHBaseRow#init(RowResult) + */ + public void init(RowResult rr, List hbaseColumns) { + this.mRowResult = rr; + this.hbaseColumns = hbaseColumns; + parsed = false; + } + + /** + * Parse the RowResult and fill each field. + * @see LazyStruct#parse() + */ + private void parse() { + if (fields == null) { + List fieldRefs = ((StructObjectInspector)oi).getAllStructFieldRefs(); + fields = new LazyObject[fieldRefs.size()]; + for (int i = 0 ; i < fields.length; i++) { + if (i > 0) { + String hbaseColumn = hbaseColumns.get(i - 1); + if (hbaseColumn.endsWith(":")) { // a column family + fields[i] = + new LazyHBaseCellMap((LazyMapObjectInspector) fieldRefs.get(i).getFieldObjectInspector()); + continue; + } + } + + fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i).getFieldObjectInspector()); + } + fieldInited = new boolean[fields.length]; + } + Arrays.fill(fieldInited, false); + parsed = true; + } + + /** + * Get one field out of the hbase row. + * + * If the field is a primitive field, return the actual object. + * Otherwise return the LazyObject. This is because PrimitiveObjectInspector + * does not have control over the object used by the user - the user simply + * directly use the Object instead of going through + * Object PrimitiveObjectInspector.get(Object). + * + * @param fieldID The field ID + * @return The field as a LazyObject + */ + public Object getField(int fieldID) { + if (!parsed) { + parse(); + } + return uncheckedGetField(fieldID); + } + + /** + * Get the field out of the row without checking parsed. + * This is called by both getField and getFieldsAsList. + * @param fieldID The id of the field starting from 0. + * @param nullSequence The sequence representing NULL value. + * @return The value of the field + */ + private Object uncheckedGetField(int fieldID) { + if (!fieldInited[fieldID]) { + fieldInited[fieldID] = true; + + ByteArrayRef ref = new ByteArrayRef(); + + if(fieldID == 0) { // the key + ref.setData(mRowResult.getRow()); + fields[fieldID].init(ref, 0, ref.data.length); + } else { + String columnName = hbaseColumns.get(fieldID - 1); + if(columnName.endsWith(":")) // it is a column family + ((LazyHBaseCellMap)fields[fieldID]).init(mRowResult, columnName); + else { // it is a column + if(mRowResult.containsKey(columnName)) { + ref.setData(mRowResult.get(columnName).getValue()); + fields[fieldID].init(ref, 0, ref.data.length); + } else { + return null; + } + } + } + } + return fields[fieldID].getObject(); + } + + ArrayList cachedList; + /** + * Get the values of the fields as an ArrayList. + * @return The values of the fields as an ArrayList. + */ + public ArrayList getFieldsAsList() { + if (!parsed) { + parse(); + } + if (cachedList == null) { + cachedList = new ArrayList(); + } else { + cachedList.clear(); + } + for (int i=0; i cells = new HbaseMapWritable(); + cells.put(colabyte, new Cell("123".getBytes(), 0)); + cells.put(colbshort, new Cell("456".getBytes(), 0)); + cells.put(colcint, new Cell("789".getBytes(), 0)); + cells.put(colalong, new Cell("1000".getBytes(), 0)); + cells.put(colbdouble, new Cell("5.3".getBytes(), 0)); + cells.put(colcstring, new Cell("hive and hadoop".getBytes(), 0)); + RowResult rr = new RowResult("test-row1".getBytes(), cells); + BatchUpdate bu = new BatchUpdate("test-row1".getBytes()); + bu.put(colabyte, "123".getBytes()); + bu.put(colbshort, "456".getBytes()); + bu.put(colcint, "789".getBytes()); + bu.put(colalong, "1000".getBytes()); + bu.put(colbdouble, "5.3".getBytes()); + bu.put(colcstring, "hive and hadoop".getBytes()); + + Object[] expectedFieldsData = { new Text("test-row1"), new ByteWritable((byte)123), + new ShortWritable((short)456), new IntWritable(789), + new LongWritable(1000), new DoubleWritable(5.3), new Text("hive and hadoop") + }; + + deserializeAndSerialize(serDe, rr, bu, expectedFieldsData); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + private void deserializeAndSerialize(HBaseSerDe serDe, RowResult rr, BatchUpdate bu, + Object[] expectedFieldsData) throws SerDeException { + // Get the row structure + StructObjectInspector oi = (StructObjectInspector)serDe.getObjectInspector(); + List fieldRefs = oi.getAllStructFieldRefs(); + assertEquals(7, fieldRefs.size()); + + // Deserialize + Object row = serDe.deserialize(rr); + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); + if (fieldData != null) { + fieldData = ((LazyPrimitive)fieldData).getWritableObject(); + } + assertEquals("Field " + i, expectedFieldsData[i], fieldData); + } + // Serialize + assertEquals(BatchUpdate.class, serDe.getSerializedClass()); + BatchUpdate serializedBU = (BatchUpdate)serDe.serialize(row, oi); + assertEquals("Serialized data", bu.toString(), serializedBU.toString()); + } + + private Properties createProperties() { + Properties tbl = new Properties(); + + // Set the configuration parameters + tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9"); + tbl.setProperty("columns", + "key,abyte,ashort,aint,along,adouble,astring"); + tbl.setProperty("columns.types", + "string,tinyint:smallint:int:bigint:double:string"); + tbl.setProperty("hbase.columns.mapping", + "cola:abyte,colb:ashort,colc:aint,cola:along,colb:adouble,colc:astring"); + return tbl; + } + +} Index: serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyHBaseObject.java =================================================================== --- serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyHBaseObject.java (revision 0) +++ serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyHBaseObject.java (revision 0) @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.lazy; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +import junit.framework.TestCase; + +public class TestLazyHBaseObject extends TestCase { + + /** + * Test the LazyMap class. + */ + public void testLazyHBaseCellMap() throws Throwable { + try { + { + // Map of Integer to String + Text nullSequence = new Text("\\N"); + ObjectInspector oi = LazyFactory.createLazyObjectInspector( + TypeInfoUtils.getTypeInfosFromTypeString("map").get(0), + new byte[]{(byte)1, (byte)2}, 0, nullSequence, false, (byte)0); + + LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi); + + // Intial a row result + HbaseMapWritable cells = new HbaseMapWritable(); + cells.put("cfa:col1".getBytes(), new Cell("cfacol1".getBytes(), 0)); + cells.put("cfa:col2".getBytes(), new Cell("cfacol2".getBytes(), 0)); + cells.put("cfb:2".getBytes(), new Cell("def".getBytes(), 0)); + cells.put("cfb:-1".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfb:0".getBytes(), new Cell("0".getBytes(), 0)); + cells.put("cfb:8".getBytes(), new Cell("abc".getBytes(), 0)); + cells.put("cfc:col3".getBytes(), new Cell("cfccol3".getBytes(), 0)); + + RowResult rr = new RowResult("test-row".getBytes(), cells); + + b.init(rr, "cfb:"); + + assertEquals(new Text("def"), ((LazyString)b.getMapValueElement(new IntWritable(2))).getWritableObject()); + assertNull(b.getMapValueElement(new IntWritable(-1))); + assertEquals(new Text("0"), ((LazyString)b.getMapValueElement(new IntWritable(0))).getWritableObject()); + assertEquals(new Text("abc"), ((LazyString)b.getMapValueElement(new IntWritable(8))).getWritableObject()); + assertNull(b.getMapValueElement(new IntWritable(12345))); + + assertEquals("{0:'0',2:'def',8:'abc'}".replace('\'', '\"'), + SerDeUtils.getJSONString(b, oi)); + } + + { + // Map of String to String + Text nullSequence = new Text("\\N"); + ObjectInspector oi = LazyFactory.createLazyObjectInspector( + TypeInfoUtils.getTypeInfosFromTypeString("map").get(0), + new byte[]{(byte)'#', (byte)'\t'}, 0, nullSequence, false, (byte)0); + + LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi); + + // Intial a row result + HbaseMapWritable cells = new HbaseMapWritable(); + cells.put("cfa:col1".getBytes(), new Cell("cfacol1".getBytes(), 0)); + cells.put("cfa:col2".getBytes(), new Cell("cfacol2".getBytes(), 0)); + cells.put("cfb:2".getBytes(), new Cell("d\tf".getBytes(), 0)); + cells.put("cfb:-1".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfb:0".getBytes(), new Cell("0".getBytes(), 0)); + cells.put("cfb:8".getBytes(), new Cell("abc".getBytes(), 0)); + cells.put("cfc:col3".getBytes(), new Cell("cfccol3".getBytes(), 0)); + + RowResult rr = new RowResult("test-row".getBytes(), cells); + + b.init(rr, "cfb:"); + + assertEquals(new Text("d\tf"), ((LazyString)b.getMapValueElement(new Text("2"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-1"))); + assertEquals(new Text("0"), ((LazyString)b.getMapValueElement(new Text("0"))).getWritableObject()); + assertEquals(new Text("abc"), ((LazyString)b.getMapValueElement(new Text("8"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-"))); + + assertEquals("{'0':'0','2':'d\\tf','8':'abc'}".replace('\'', '\"'), + SerDeUtils.getJSONString(b, oi)); + } + + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + /** + * Test the LazyHBaseRow class. + */ + public void testLazyHBaseRow() throws Throwable { + try { + { + ArrayList fieldTypeInfos = + TypeInfoUtils.getTypeInfosFromTypeString("string,int,array,map,string"); + List fieldNames = Arrays.asList(new String[]{"key", "a", "b", "c", "d"}); + Text nullSequence = new Text("\\N"); + + List hbaseColumnNames = + Arrays.asList(new String[]{"cfa:a", "cfa:b", "cfb:c", "cfb:d"}); + + ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames, + fieldTypeInfos, new byte[] {' ', ':', '='}, nullSequence, false, false, (byte)0); + LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi); + + HbaseMapWritable cells = new HbaseMapWritable(); + + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfa:b".getBytes(), new Cell("a:b:c".getBytes(), 0)); + cells.put("cfb:c".getBytes(), new Cell("d=e:f=g".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("hi".getBytes(), 0)); + RowResult rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':['a','b','c'],'c':{'d':'e','f':'g'},'d':'hi'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfb:c".getBytes(), new Cell("d=e:f=g".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':null,'c':{'d':'e','f':'g'},'d':null}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:b".getBytes(), new Cell("a".getBytes(), 0)); + cells.put("cfb:c".getBytes(), new Cell("d=\\N:f=g:h".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("no".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':null,'b':['a'],'c':{'d':null,'f':'g','h':null},'d':'no'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:b".getBytes(), new Cell(":a::".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("no".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':null,'b':['','a','',''],'c':null,'d':'no'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfa:b".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfb:c".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + } + + // column family is mapped to Map + { + ArrayList fieldTypeInfos = + TypeInfoUtils.getTypeInfosFromTypeString("string,int,array,map,string"); + List fieldNames = Arrays.asList(new String[]{"key", "a", "b", "c", "d"}); + Text nullSequence = new Text("\\N"); + + List hbaseColumnNames = + Arrays.asList(new String[]{"cfa:a", "cfa:b", "cfb:", "cfc:d"}); + + ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames, + fieldTypeInfos, new byte[] {' ', ':', '='}, nullSequence, false, false, (byte)0); + LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi); + + HbaseMapWritable cells = new HbaseMapWritable(); + + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfa:b".getBytes(), new Cell("a:b:c".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("e".getBytes(), 0)); + cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0)); + cells.put("cfc:d".getBytes(), new Cell("hi".getBytes(), 0)); + RowResult rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':['a','b','c'],'c':{'d':'e','f':'g'},'d':'hi'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("e".getBytes(), 0)); + cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':null,'c':{'d':'e','f':'g'},'d':null}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:b".getBytes(), new Cell("a".getBytes(), 0)); + cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0)); + cells.put("cfc:d".getBytes(), new Cell("no".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':null,'b':['a'],'c':{'f':'g'},'d':'no'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:b".getBytes(), new Cell(":a::".getBytes(), 0)); + cells.put("cfc:d".getBytes(), new Cell("no".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':null,'b':['','a','',''],'c':{},'d':'no'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfa:b".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfc:d".getBytes(), new Cell("".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + } + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + +}