Index: ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java
===================================================================
--- ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java (revision 800178)
+++ ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java (working copy)
@@ -68,6 +68,16 @@
protected String logFile;
protected String clusterMode;
+
+ protected String miniHBase;
+
+ public void setMiniHBase(String miniHBase) {
+ this.miniHBase = miniHBase;
+ }
+
+ public String getMiniHBase() {
+ return miniHBase;
+ }
public void setClusterMode(String clusterMode) {
this.clusterMode = clusterMode;
@@ -262,6 +272,9 @@
if (clusterMode == null)
clusterMode = new String("");
+
+ if (miniHBase == null)
+ miniHBase = new String("false");
// For each of the qFiles generate the test
VelocityContext ctx = new VelocityContext();
@@ -270,6 +283,7 @@
ctx.put("resultsDir", resultsDir);
ctx.put("logDir", logDir);
ctx.put("clusterMode", clusterMode);
+ ctx.put("miniHBase", miniHBase);
File outFile = new File(outDir, className + ".java");
FileWriter writer = new FileWriter(outFile);
Index: build-common.xml
===================================================================
--- build-common.xml (revision 800178)
+++ build-common.xml (working copy)
@@ -135,6 +135,7 @@
+
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 800178)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
@@ -41,6 +42,7 @@
public class HiveConf extends Configuration {
protected String hiveJar;
+ protected String hbaseJar;
protected Properties origProp;
protected String auxJars;
private static final Log l4j = LogFactory.getLog(HiveConf.class);
@@ -75,6 +77,9 @@
HADOOPJT("mapred.job.tracker", "local"),
HADOOPNUMREDUCERS("mapred.reduce.tasks", 1),
HADOOPJOBNAME("mapred.job.name", null),
+
+ // hbase stuff
+ HBASEMASTER("hbase.master", "localhost:60000"),
// MetaStore stuff.
METASTOREDIRECTORY("hive.metastore.metadb.dir", ""),
@@ -310,6 +315,16 @@
// preserve the original configuration
origProp = getUnderlyingProps();
+ // try to add the hbase configuration
+ URL hbaseconfurl = getClassLoader().getResource("hbase-default.xml");
+ if (hbaseconfurl != null) {
+ addResource(hbaseconfurl);
+ }
+ URL hbasesiteurl = getClassLoader().getResource("hbase-site.xml");
+ if (hbasesiteurl != null) {
+ addResource(hbasesiteurl);
+ }
+
// let's add the hive configuration
URL hconfurl = getClassLoader().getResource("hive-default.xml");
if(hconfurl == null) {
@@ -345,6 +360,15 @@
if(auxJars == null) {
auxJars = this.get(ConfVars.HIVEAUXJARS.varname);
}
+
+ hbaseJar = (new JobConf(HBaseConfiguration.class)).getJar();
+ if (hbaseJar != null) {
+ if (auxJars == null)
+ auxJars = hbaseJar;
+ else
+ auxJars = hbaseJar + "," + auxJars;
+ this.setAuxJars(auxJars);
+ }
}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 800178)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy)
@@ -26,6 +26,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.generated.master.table_jsp;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -293,21 +294,27 @@
}
tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
}
-
tbl.getSd().setLocation(tblPath.toString());
// get_table checks whether database exists, it should be moved here
if(is_table_exists(tbl.getDbName(), tbl.getTableName())) {
throw new AlreadyExistsException("Table " + tbl.getTableName() + " already exists");
}
-
- if(!wh.isDir(tblPath)) {
- if(!wh.mkdirs(tblPath)) {
- throw new MetaException (tblPath + " is not a directory or unable to create one");
+
+ // If the hive table is a file table, we create the dir.
+ if (!(tbl.getSd().getInputFormat() != null &&
+ tbl.getSd().getInputFormat().equals(
+ "org.apache.hadoop.hive.ql.io.HiveHBaseTableInputFormat"))) {
+
+ if(!wh.isDir(tblPath)) {
+ if(!wh.mkdirs(tblPath)) {
+ throw new MetaException (tblPath + " is not a directory or unable to create one");
+ }
+ madeDir = true;
}
- madeDir = true;
+
}
-
+
getMS().createTable(tbl);
success = getMS().commitTransaction();
Index: ql/build.xml
===================================================================
--- ql/build.xml (revision 800178)
+++ ql/build.xml (working copy)
@@ -76,11 +76,22 @@
queryDirectory="${ql.test.query.clientpositive.dir}"
queryFile="${qfile}"
clusterMode="${clustermode}"
+ miniHBase="false"
resultsDirectory="${ql.test.results.clientpositive.dir}" className="TestCliDriver"
logFile="${test.log.dir}/testclidrivergen.log"
logDirectory="${test.log.dir}/clientpositive"/>
+
+ )(HiveSequenceFileOutputFormat.class);
- String newFile = hiveScratchDir + File.separator + (++numEmptyPaths);
- Path newPath = new Path(newFile);
- LOG.info("Changed input file to " + newPath.toString());
-
- // toggle the work
- LinkedHashMap> pathToAliases = work.getPathToAliases();
- if (isEmptyPath) {
- assert path != null;
- pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path));
- pathToAliases.remove(path);
+ if (outFileFormat == HiveHBaseTableOutputFormat.class) {
+ FileInputFormat.addInputPaths(job, path);
+ LOG.info("Add a hbase table " + path);
+ } else {
+ String newFile = hiveScratchDir + File.separator + (++numEmptyPaths);
+ Path newPath = new Path(newFile);
+ LOG.info("Changed input file to " + newPath.toString());
+
+ // toggle the work
+ LinkedHashMap> pathToAliases = work.getPathToAliases();
+ if (isEmptyPath) {
+ assert path != null;
+ pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path));
+ pathToAliases.remove(path);
+ }
+ else {
+ assert path == null;
+ ArrayList newList = new ArrayList();
+ newList.add(alias);
+ pathToAliases.put(newPath.toUri().toString(), newList);
+ }
+
+ work.setPathToAliases(pathToAliases);
+
+ LinkedHashMap pathToPartitionInfo = work.getPathToPartitionInfo();
+ if (isEmptyPath) {
+ pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
+ pathToPartitionInfo.remove(path);
+ }
+ else {
+ partitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
+ Class extends InputFormat> inputFormat = SequenceFileInputFormat.class;
+ pDesc.getTableDesc().setInputFileFormatClass(inputFormat);
+ pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
+ }
+ work.setPathToPartitionInfo(pathToPartitionInfo);
+
+ String onefile = newPath.toString();
+ RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newPath, Text.class, false, new Properties(), null);
+ recWriter.close(false);
+ FileInputFormat.addInputPaths(job, onefile);
}
- else {
- assert path == null;
- ArrayList newList = new ArrayList();
- newList.add(alias);
- pathToAliases.put(newPath.toUri().toString(), newList);
- }
-
- work.setPathToAliases(pathToAliases);
-
- LinkedHashMap pathToPartitionInfo = work.getPathToPartitionInfo();
- if (isEmptyPath) {
- pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
- pathToPartitionInfo.remove(path);
- }
- else {
- partitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
- Class extends InputFormat> inputFormat = SequenceFileInputFormat.class;
- pDesc.getTableDesc().setInputFileFormatClass(inputFormat);
- pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
- }
- work.setPathToPartitionInfo(pathToPartitionInfo);
-
- String onefile = newPath.toString();
- RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newPath, Text.class, false, new Properties(), null);
- recWriter.close(false);
- FileInputFormat.addInputPaths(job, onefile);
return numEmptyPaths;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 800178)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy)
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveHBaseTableOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
@@ -56,7 +57,8 @@
transient protected Serializer serializer;
transient protected BytesWritable commonKey = new BytesWritable();
transient protected TableIdEnum tabIdEnum = null;
- transient private LongWritable row_count;
+ transient private LongWritable row_count;
+ transient private boolean isHBaseTable;
public static enum TableIdEnum {
TABLE_ID_1_ROWCOUNT, TABLE_ID_2_ROWCOUNT, TABLE_ID_3_ROWCOUNT, TABLE_ID_4_ROWCOUNT, TABLE_ID_5_ROWCOUNT, TABLE_ID_6_ROWCOUNT, TABLE_ID_7_ROWCOUNT, TABLE_ID_8_ROWCOUNT, TABLE_ID_9_ROWCOUNT, TABLE_ID_10_ROWCOUNT, TABLE_ID_11_ROWCOUNT, TABLE_ID_12_ROWCOUNT, TABLE_ID_13_ROWCOUNT, TABLE_ID_14_ROWCOUNT, TABLE_ID_15_ROWCOUNT;
@@ -65,10 +67,14 @@
transient protected boolean autoDelete = false;
private void commit() throws IOException {
- if (!fs.rename(outPath, finalPath)) {
- throw new IOException ("Unable to rename output to: " + finalPath);
+ if(!isHBaseTable) {
+ if (!fs.rename(outPath, finalPath)) {
+ throw new IOException ("Unable to rename output to: " + finalPath);
+ }
+ LOG.info("Committed to output file: " + finalPath);
+ } else {
+ LOG.info("Committed to HBase Table: " + finalPath.getName());
}
- LOG.info("Committed to output file: " + finalPath);
}
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -76,6 +82,7 @@
serializer = (Serializer)conf.getTableInfo().getDeserializerClass().newInstance();
serializer.initialize(null, conf.getTableInfo().getProperties());
+ isHBaseTable = conf.getTableInfo().getOutputFileFormatClass() == HiveHBaseTableOutputFormat.class;
JobConf jc;
if(hconf instanceof JobConf) {
@@ -97,11 +104,15 @@
Path tmpPath = Utilities.toTempPath(specPath);
String taskId = Utilities.getTaskId(hconf);
fs =(new Path(specPath)).getFileSystem(hconf);
- finalPath = new Path(tmpPath, taskId);
- outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+ if (isHBaseTable) {
+ outPath = finalPath = new Path(specPath);
+ LOG.info("Writing to HBase table: FS " + outPath.getName());
+ } else {
+ finalPath = new Path(tmpPath, taskId);
+ outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+ LOG.info("Writing to temp file: FS " + outPath);
+ }
- LOG.info("Writing to temp file: FS " + outPath);
-
HiveOutputFormat, ?> hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
final Class extends Writable> outputClass = serializer.getSerializedClass();
boolean isCompressed = conf.getCompressed();
@@ -116,13 +127,15 @@
outWriter = getRecordWriter(jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath);
- // in recent hadoop versions, use deleteOnExit to clean tmp files.
- try {
- Method deleteOnExit = FileSystem.class.getDeclaredMethod("deleteOnExit", new Class [] {Path.class});
- deleteOnExit.setAccessible(true);
- deleteOnExit.invoke(fs, outPath);
- autoDelete = true;
- } catch (Exception e) {}
+ if (!isHBaseTable) {
+ // in recent hadoop versions, use deleteOnExit to clean tmp files.
+ try {
+ Method deleteOnExit = FileSystem.class.getDeclaredMethod("deleteOnExit", new Class [] {Path.class});
+ deleteOnExit.setAccessible(true);
+ deleteOnExit.invoke(fs, outPath);
+ autoDelete = true;
+ } catch (Exception e) {}
+ }
initializeChildren(hconf);
} catch (HiveException e) {
@@ -180,7 +193,7 @@
// Hadoop always call close() even if an Exception was thrown in map() or reduce().
try {
outWriter.close(abort);
- if(!autoDelete)
+ if(!autoDelete && !isHBaseTable)
fs.delete(outPath, true);
} catch (Exception e) {
e.printStackTrace();
@@ -198,7 +211,7 @@
@Override
public void jobClose(Configuration hconf, boolean success) throws HiveException {
try {
- if(conf != null) {
+ if(conf != null && !isHBaseTable) {
String specPath = conf.getDirName();
fs = (new Path(specPath)).getFileSystem(hconf);
Path tmpPath = Utilities.toTempPath(specPath);
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableInputFormat.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableInputFormat.java (revision 0)
@@ -0,0 +1,43 @@
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class HiveHBaseTableInputFormat
+ implements InputFormat, JobConfigurable {
+
+ org.apache.hadoop.hbase.mapred.TableInputFormat mInputFormat;
+
+ public HiveHBaseTableInputFormat() {
+ mInputFormat = new org.apache.hadoop.hbase.mapred.TableInputFormat();
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ return (RecordReader) mInputFormat.getRecordReader(split, job, reporter);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ return mInputFormat.getSplits(job, numSplits);
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ mInputFormat.configure(job);
+ }
+
+ public void validateInput(JobConf job) throws IOException {
+ mInputFormat.validateInput(job);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableOutputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableOutputFormat.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveHBaseTableOutputFormat.java (revision 0)
@@ -0,0 +1,63 @@
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Progressable;
+
+public class HiveHBaseTableOutputFormat extends
+ TableOutputFormat implements
+ HiveOutputFormat {
+
+ ImmutableBytesWritable KEY = new ImmutableBytesWritable();
+
+ /**
+ * update to the final out table, and output an empty key as the key
+ *
+ * @param jc
+ * the job configuration file
+ * @param finalOutPath
+ * the final output table name
+ * @param valueClass
+ * the value class used for create
+ * @param isCompressed
+ * whether the content is compressed or not
+ * @param tableProperties
+ * the tableInfo of this file's corresponding table
+ * @param progress
+ * progress used for status report
+ * @return the RecordWriter for the output file
+ */
+ @Override
+ public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ Class extends Writable> valueClass, boolean isCompressed,
+ Properties tableProperties, Progressable progress) throws IOException {
+ jc.set(TableOutputFormat.OUTPUT_TABLE, finalOutPath.getName());
+
+ final org.apache.hadoop.mapred.RecordWriter tblWriter =
+ this.getRecordWriter(null, jc, null, progress);
+ return new RecordWriter() {
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ tblWriter.close(null);
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ BatchUpdate bu = (BatchUpdate)w;
+ KEY.set(bu.getRow());
+ tblWriter.write(KEY, bu);
+ }
+
+ };
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 800178)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy)
@@ -36,12 +36,16 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.serde2.hbase.HBaseSerDe;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -97,6 +101,8 @@
public Path getPath() {
if (inputSplit instanceof FileSplit) {
return ((FileSplit)inputSplit).getPath();
+ } else if (inputSplit instanceof TableSplit) {
+ return new Path("/" + Bytes.toString(((TableSplit)inputSplit).getTableName()));
}
return new Path("");
}
@@ -179,6 +185,7 @@
(InputFormat)ReflectionUtils.newInstance(inputFormatClass, job);
inputFormats.put(inputFormatClass, newInstance);
} catch (Exception e) {
+ e.printStackTrace();
throw new IOException("Cannot create an instance of InputFormat class " + inputFormatClass.getName()
+ " as specified in mapredWork!");
}
@@ -258,14 +265,21 @@
// for each dir, get the InputFormat, and do getSplits.
for(Path dir: dirs) {
+ FileInputFormat.setInputPaths(newjob, dir);
tableDesc table = getTableDescFromPath(dir);
+ if (table.getSerdeClassName() != null &&
+ table.getSerdeClassName().equals(HBaseSerDe.class.getName())) {
+ String hbaseColumns = (String) table.getProperties().get("hbase.columns.mapping");
+ hbaseColumns.replace(',', ' ');
+ newjob.set(TableInputFormat.COLUMN_LIST, hbaseColumns);
+ }
+
// create a new InputFormat instance if this is the first time to see this class
Class inputFormatClass = table.getInputFileFormatClass();
- InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
-
- FileInputFormat.setInputPaths(newjob, dir);
+ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, newjob);
+
newjob.setInputFormat(inputFormat.getClass());
- InputSplit[] iss = inputFormat.getSplits(newjob, numSplits/dirs.length);
+ InputSplit[] iss = inputFormat.getSplits(newjob, (numSplits/dirs.length == 0) ? 1 : numSplits/dirs.length);
for(InputSplit is: iss) {
result.add(new HiveInputSplit(is, inputFormatClass.getName()));
}
Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 800178)
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy)
@@ -20,18 +20,30 @@
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.derby.iapi.sql.dictionary.TableDescriptor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.HiveMetaException;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -249,6 +261,8 @@
tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getName(), tbl.getDeserializer()));
}
tbl.checkValidity();
+ if (tbl.getInputFormatClass() == org.apache.hadoop.hive.ql.io.HiveHBaseTableInputFormat.class)
+ validateAndCreateHBaseTableIfNeeded(tbl);
getMSC().createTable(tbl.getTTable());
} catch (AlreadyExistsException e) {
if (!ifNotExists) {
@@ -258,7 +272,63 @@
throw new HiveException(e);
}
}
+
+ private void validateAndCreateHBaseTableIfNeeded(Table tbl)
+ throws URISyntaxException, HiveException, IOException {
+ String location = tbl.getTTable().getSd().getLocation();
+ if (location == null) { // not specify a hbase location, try to generate one hbase:///tablename
+ location = tbl.getTTable().getTableName();
+ }
+ // make the location absolute
+ tbl.getTTable().getSd().setLocation("/"+location);
+
+ // Build the mapping schema
+ Set columnFamilies = new HashSet();
+ // Check the hbase columns and get all the families
+ String hbaseColumnStr = tbl.getSerdeParam("hbase.columns.mapping");
+ if (hbaseColumnStr == null)
+ throw new HiveException("No schema mapping defined in Serde.");
+ String[] hbaseColumns = hbaseColumnStr.split(",");
+ for(String hbaseColumn : hbaseColumns) {
+ int idx = hbaseColumn.indexOf(":");
+ if (idx < 0)
+ throw new HiveException(hbaseColumn + " is not a qualified hbase column.");
+ columnFamilies.add(hbaseColumn.substring(0, idx + 1));
+ }
+ // Check if the given hbase table existes
+ HBaseConfiguration hbaseConf = new HBaseConfiguration(conf);
+ HBaseAdmin admin = new HBaseAdmin(hbaseConf);
+ HTableDescriptor tblDesc;
+
+ if (!admin.tableExists(location)) {
+
+ String externalStr = tbl.getProperty("EXTERNAL");
+ // it is not a external table create one
+ if (externalStr == null || !externalStr.equals("TRUE")) {
+ // Create the all column descriptors
+ tblDesc = new HTableDescriptor(location);
+ for (String cf : columnFamilies) {
+ tblDesc.addFamily(new HColumnDescriptor(cf));
+ }
+
+ admin.createTable(tblDesc);
+ } else { // an external table
+ throw new HiveException("HBase table " + location +
+ " doesn't exist while the table is declared as an external table.");
+ }
+
+ } else { // make sure the schema mapping is right
+ tblDesc = admin.getTableDescriptor(location);
+ for (String cf : columnFamilies) {
+ if(!tblDesc.hasFamily(Bytes.toBytes(cf)))
+ throw new HiveException("Column Family " + cf + " is not defined in hbase table " + location);
+ }
+ }
+ // ensure the table is online
+ new HTable(hbaseConf, tblDesc.getName());
+ }
+
/**
* Drops table along with the data in it. If the table doesn't exist
* then it is a no-op
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 800178)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy)
@@ -37,11 +37,12 @@
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.MsckDesc;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveHBaseTableInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveHBaseTableOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
@@ -61,7 +62,6 @@
import org.apache.hadoop.hive.ql.plan.fetchWork;
import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
-import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -91,6 +91,8 @@
private static final String SEQUENCEFILE_OUTPUT = SequenceFileOutputFormat.class.getName();
private static final String RCFILE_INPUT = RCFileInputFormat.class.getName();
private static final String RCFILE_OUTPUT = RCFileOutputFormat.class.getName();
+ private static final String HBASETABLE_INPUT = HiveHBaseTableInputFormat.class.getName();
+ private static final String HBASETABLE_OUTPUT = HiveHBaseTableOutputFormat.class.getName();
private static final String COLUMNAR_SERDE = ColumnarSerDe.class.getName();
@@ -263,6 +265,10 @@
outputFormat = RCFILE_OUTPUT;
serde = COLUMNAR_SERDE;
break;
+ case HiveParser.TOK_TBLHBASETABLE:
+ inputFormat = HBASETABLE_INPUT;
+ outputFormat = HBASETABLE_OUTPUT;
+ break;
case HiveParser.TOK_TABLEFILEFORMAT:
inputFormat = unescapeSQLString(child.getChild(0).getText());
outputFormat = unescapeSQLString(child.getChild(1).getText());
@@ -273,6 +279,12 @@
default: assert false;
}
}
+
+ if (inputFormat == HBASETABLE_INPUT && serde == null) {
+ throw new SemanticException("Should specify the serde during creating a hbased-hive table!" +
+ "Because you need to specify the column mapping from hbase table to hive table.");
+ }
+
if (likeTableName == null) {
createTableDesc crtTblDesc =
new createTableDesc(tableName, isExt, cols, partCols, bucketCols,
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 800178)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy)
@@ -108,6 +108,7 @@
TOK_TBLSEQUENCEFILE;
TOK_TBLTEXTFILE;
TOK_TBLRCFILE;
+TOK_TBLHBASETABLE;
TOK_TABLEFILEFORMAT;
TOK_TABCOLNAME;
TOK_TABLELOCATION;
@@ -429,6 +430,7 @@
KW_STORED KW_AS KW_SEQUENCEFILE -> TOK_TBLSEQUENCEFILE
| KW_STORED KW_AS KW_TEXTFILE -> TOK_TBLTEXTFILE
| KW_STORED KW_AS KW_RCFILE -> TOK_TBLRCFILE
+ | KW_STORED KW_AS KW_HBASETABLE -> TOK_TBLHBASETABLE
| KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral
-> ^(TOK_TABLEFILEFORMAT $inFmt $outFmt)
;
@@ -1210,6 +1212,7 @@
KW_SEQUENCEFILE: 'SEQUENCEFILE';
KW_TEXTFILE: 'TEXTFILE';
KW_RCFILE: 'RCFILE';
+KW_HBASETABLE: 'HBASETABLE';
KW_INPUTFORMAT: 'INPUTFORMAT';
KW_OUTPUTFORMAT: 'OUTPUTFORMAT';
KW_LOCATION: 'LOCATION';
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 800178)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -55,6 +55,7 @@
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveHBaseTableOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -2423,20 +2424,29 @@
if(parts != null && parts.size() > 0) {
throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
}
+
+ // if the destination table is a hbased hive table
+ boolean isHBaseTable = dest_tab.getOutputFormatClass() == HiveHBaseTableOutputFormat.class;
+
dest_path = dest_tab.getPath();
- queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri());
+ if (isHBaseTable)
+ queryTmpdir = dest_path.toUri().toString();
+ else
+ queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri());
table_desc = Utilities.getTableDesc(dest_tab);
this.idToTableNameMap.put( String.valueOf(this.destTableId), dest_tab.getName());
currentTableId = this.destTableId;
this.destTableId ++;
- // Create the work for moving the table
- this.loadTableWork.add
- (new loadTableDesc(queryTmpdir,
- ctx.getExternalTmpFileURI(dest_path.toUri()),
- table_desc,
- new HashMap()));
+ if (!isHBaseTable) {
+ // Create the work for moving the table
+ this.loadTableWork.add
+ (new loadTableDesc(queryTmpdir,
+ ctx.getExternalTmpFileURI(dest_path.toUri()),
+ table_desc,
+ new HashMap()));
+ }
outputs.add(new WriteEntity(dest_tab));
break;
}
Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 800178)
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy)
@@ -26,10 +26,12 @@
import junit.framework.TestCase;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
@@ -48,6 +50,7 @@
import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.scriptDesc;
import org.apache.hadoop.hive.ql.plan.selectDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.TextInputFormat;
@@ -466,7 +469,30 @@
private void executePlan(File planFile) throws Exception {
String testName = new Exception().getStackTrace()[1].getMethodName();
- String cmdLine = conf.getVar(HiveConf.ConfVars.HADOOPBIN) + " jar " + conf.getJar() +
+
+ String libJarsOption;
+ {
+ String addedJars = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ conf.setVar(ConfVars.HIVEADDEDJARS, addedJars);
+
+ String auxJars = conf.getAuxJars();
+ // Put auxjars and addedjars together into libjars
+ if (StringUtils.isEmpty(addedJars)) {
+ if (StringUtils.isEmpty(auxJars)) {
+ libJarsOption = " ";
+ } else {
+ libJarsOption = " -libjars " + auxJars + " ";
+ }
+ } else {
+ if (StringUtils.isEmpty(auxJars)) {
+ libJarsOption = " -libjars " + addedJars + " ";
+ } else {
+ libJarsOption = " -libjars " + addedJars + "," + auxJars + " ";
+ }
+ }
+ }
+
+ String cmdLine = conf.getVar(HiveConf.ConfVars.HADOOPBIN) + " jar " + libJarsOption + conf.getJar() +
" org.apache.hadoop.hive.ql.exec.ExecDriver -plan " +
planFile.toString() + " " + ExecDriver.generateCmdLine(conf);
System.out.println("Executing: " + cmdLine);
Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 800178)
+++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy)
@@ -24,6 +24,7 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
+import java.io.IOException;
import java.io.PrintStream;
import java.io.Serializable;
import java.net.URI;
@@ -37,13 +38,22 @@
import java.lang.reflect.Method;
import java.lang.reflect.Constructor;
+import org.apache.derby.iapi.sql.dictionary.TableDescriptor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hive.cli.CliDriver;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -90,6 +100,8 @@
private Object dfs = null;
private boolean miniMr = false;
private Class> dfsClass = null;
+ private MiniHBaseCluster hbase = null;
+ private final int NUM_REGIONSERVERS = 1;
public boolean deleteDirectory(File path) {
if (path.exists()) {
@@ -164,10 +176,10 @@
}
public QTestUtil(String outDir, String logDir) throws Exception {
- this(outDir, logDir, false);
+ this(outDir, logDir, false, false);
}
- public QTestUtil(String outDir, String logDir, boolean miniMr) throws Exception {
+ public QTestUtil(String outDir, String logDir, boolean miniMr, boolean miniHBase) throws Exception {
this.outDir = outDir;
this.logDir = logDir;
conf = new HiveConf(Driver.class);
@@ -207,7 +219,25 @@
conf.set("hive.metastore.warehouse.dir", fsName.concat("/build/ql/test/data/warehouse/"));
conf.set("mapred.job.tracker", "localhost:" + mr.getJobTrackerPort());
- }
+ }
+
+ if (miniHBase) {
+ // Setup the hbase Cluster
+ try {
+ conf.setVar(ConfVars.HBASEMASTER, "local");
+ HBaseConfiguration hbaseConf = new HBaseConfiguration(conf);
+ hbase = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS);
+ conf.setVar(ConfVars.HBASEMASTER, hbase.getHMasterAddress().toString());
+ // opening the META table ensures that cluster is running
+ new HTable(new HBaseConfiguration(conf), HConstants.META_TABLE_NAME);
+ } catch (IOException ie) {
+ ie.printStackTrace();
+ if (hbase != null) {
+ hbase.shutdown();
+ }
+ throw ie;
+ }
+ }
// System.out.println(conf.toString());
testFiles = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
@@ -225,6 +255,12 @@
public void shutdown() throws Exception {
cleanUp();
+ if (hbase != null) {
+ HConnectionManager.deleteConnectionInfo(new HBaseConfiguration(conf), true);
+ hbase.shutdown();
+ hbase = null;
+ }
+
if (dfs != null) {
Method m = dfsClass.getDeclaredMethod("shutdown", new Class[]{});
m.invoke(dfs, new Object[]{});
Index: ql/src/test/queries/clienthbase/hbase_queries.q
===================================================================
--- ql/src/test/queries/clienthbase/hbase_queries.q (revision 0)
+++ ql/src/test/queries/clienthbase/hbase_queries.q (revision 0)
@@ -0,0 +1,64 @@
+DROP TABLE hbase_table_1;
+CREATE TABLE hbase_table_1(key int, value string)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe'
+WITH SERDEPROPERTIES (
+"hbase.columns.mapping" = "cf:string"
+) STORED AS HBASETABLE;
+
+EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *;
+FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *;
+
+DROP TABLE hbase_table_2;
+CREATE EXTERNAL TABLE hbase_table_2(key int, value string)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe'
+WITH SERDEPROPERTIES (
+"hbase.columns.mapping" = "cf:string"
+) STORED AS HBASETABLE LOCATION "hbase_table_1";
+
+EXPLAIN FROM
+(SELECT hbase_table_1.* FROM hbase_table_1) x
+JOIN
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key)
+SELECT Y.* LIMIT 20;
+
+FROM
+(SELECT hbase_table_1.* FROM hbase_table_1) x
+JOIN
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key)
+SELECT Y.* LIMIT 20;
+
+EXPLAIN FROM
+(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x
+JOIN
+(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y
+ON (x.key = Y.key)
+SELECT Y.*;
+
+FROM
+(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x
+JOIN
+(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y
+ON (x.key = Y.key)
+SELECT Y.*;
+
+DROP TABLE empty_hbase_table;
+CREATE TABLE empty_hbase_table(key int, value string)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe'
+WITH SERDEPROPERTIES (
+"hbase.columns.mapping" = "cf:string"
+) STORED AS HBASETABLE;
+
+DROP TABLE empty_normal_table;
+CREATE TABLE empty_normal_table(key int, value string);
+
+select * from (select count(1) from empty_normal_table union all select count(1) from empty_hbase_table) x;
+select * from (select count(1) from empty_normal_table union all select count(1) from hbase_table_1) x;
+select * from (select count(1) from src union all select count(1) from empty_hbase_table) x;
+select * from (select count(1) from src union all select count(1) from hbase_table_1) x;
+
+DROP TABLE hbase_table_1;
+DROP TABLE hbase_table_2;
+DROP TABLE empty_hbase_table;
+DROP TABLE empty_normal_table;
\ No newline at end of file
Index: ql/src/test/results/clienthbase/hbase_queries.q.out
===================================================================
--- ql/src/test/results/clienthbase/hbase_queries.q.out (revision 0)
+++ ql/src/test/results/clienthbase/hbase_queries.q.out (revision 0)
@@ -0,0 +1,313 @@
+query: DROP TABLE hbase_table_1
+query: CREATE TABLE hbase_table_1(key int, value string)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe'
+WITH SERDEPROPERTIES (
+"hbase.columns.mapping" = "cf:string"
+) STORED AS HBASETABLE
+query: EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB hbase_table_1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: UDFToInteger(_col0)
+ type: int
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.hive.ql.io.HiveHBaseTableInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveHBaseTableOutputFormat
+ serde: org.apache.hadoop.hive.serde2.hbase.HBaseSerDe
+ name: hbase_table_1
+
+
+query: FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *
+Input: default/src
+Output: default/hbase_table_1
+query: DROP TABLE hbase_table_2
+query: CREATE EXTERNAL TABLE hbase_table_2(key int, value string)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe'
+WITH SERDEPROPERTIES (
+"hbase.columns.mapping" = "cf:string"
+) STORED AS HBASETABLE LOCATION "hbase_table_1"
+query: EXPLAIN FROM
+(SELECT hbase_table_1.* FROM hbase_table_1) x
+JOIN
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key)
+SELECT Y.* LIMIT 20
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_1))))) x) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF src))))) Y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL Y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF Y))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ x:hbase_table_1
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ outputColumnNames: _col0
+ Reduce Output Operator
+ key expressions:
+ expr: UDFToDouble(_col0)
+ type: double
+ sort order: +
+ Map-reduce partition columns:
+ expr: UDFToDouble(_col0)
+ type: double
+ tag: 0
+ y:src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: UDFToDouble(_col0)
+ type: double
+ sort order: +
+ Map-reduce partition columns:
+ expr: UDFToDouble(_col0)
+ type: double
+ tag: 1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1 {VALUE._col0} {VALUE._col1}
+ outputColumnNames: _col2, _col3
+ Select Operator
+ expressions:
+ expr: _col2
+ type: string
+ expr: _col3
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 20
+
+
+query: FROM
+(SELECT hbase_table_1.* FROM hbase_table_1) x
+JOIN
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key)
+SELECT Y.* LIMIT 20
+Input: default/hbase_table_1
+Input: default/src
+Output: file:/home/lmsp/projects/hive/build/ql/tmp/268562938/10000
+0 val_0
+0 val_0
+0 val_0
+2 val_2
+4 val_4
+5 val_5
+5 val_5
+5 val_5
+8 val_8
+9 val_9
+10 val_10
+11 val_11
+12 val_12
+12 val_12
+15 val_15
+15 val_15
+17 val_17
+18 val_18
+18 val_18
+19 val_19
+query: EXPLAIN FROM
+(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x
+JOIN
+(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y
+ON (x.key = Y.key)
+SELECT Y.*
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_1))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL hbase_table_1) key) 100)))) x) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_2)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_2))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL hbase_table_2) key) 120)))) Y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL Y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF Y)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ y:hbase_table_2
+ Filter Operator
+ predicate:
+ expr: (key < 120)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (key < 120)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: int
+ tag: 1
+ value expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ x:hbase_table_1
+ Filter Operator
+ predicate:
+ expr: (key > 100)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (key > 100)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ outputColumnNames: _col0
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: int
+ tag: 0
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1 {VALUE._col0} {VALUE._col1}
+ outputColumnNames: _col2, _col3
+ Select Operator
+ expressions:
+ expr: _col2
+ type: int
+ expr: _col3
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+query: FROM
+(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x
+JOIN
+(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y
+ON (x.key = Y.key)
+SELECT Y.*
+Input: default/hbase_table_2
+Input: default/hbase_table_1
+Output: file:/home/lmsp/projects/hive/build/ql/tmp/394656156/10000
+103 val_103
+104 val_104
+105 val_105
+111 val_111
+113 val_113
+114 val_114
+116 val_116
+118 val_118
+119 val_119
+query: DROP TABLE empty_hbase_table
+query: CREATE TABLE empty_hbase_table(key int, value string)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.hbase.HBaseSerDe'
+WITH SERDEPROPERTIES (
+"hbase.columns.mapping" = "cf:string"
+) STORED AS HBASETABLE
+query: DROP TABLE empty_normal_table
+query: CREATE TABLE empty_normal_table(key int, value string)
+query: select * from (select count(1) from empty_normal_table union all select count(1) from empty_hbase_table) x
+Input: default/empty_hbase_table
+Input: default/empty_normal_table
+Output: file:/home/lmsp/projects/hive/build/ql/tmp/1480628629/10000
+0
+0
+query: select * from (select count(1) from empty_normal_table union all select count(1) from hbase_table_1) x
+Input: default/empty_normal_table
+Input: default/hbase_table_1
+Output: file:/home/lmsp/projects/hive/build/ql/tmp/1722384298/10000
+309
+0
+query: select * from (select count(1) from src union all select count(1) from empty_hbase_table) x
+Input: default/empty_hbase_table
+Input: default/src
+Output: file:/home/lmsp/projects/hive/build/ql/tmp/1091258489/10000
+500
+0
+query: select * from (select count(1) from src union all select count(1) from hbase_table_1) x
+Input: default/src
+Input: default/hbase_table_1
+Output: file:/home/lmsp/projects/hive/build/ql/tmp/1515840724/10000
+309
+500
+query: DROP TABLE hbase_table_1
+query: DROP TABLE hbase_table_2
+query: DROP TABLE empty_hbase_table
+query: DROP TABLE empty_normal_table
Index: ql/src/test/templates/TestCliDriver.vm
===================================================================
--- ql/src/test/templates/TestCliDriver.vm (revision 800178)
+++ ql/src/test/templates/TestCliDriver.vm (working copy)
@@ -36,7 +36,10 @@
if ("$clusterMode".equals("miniMR"))
miniMR = true;
- qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", miniMR);
+ // as we need to start a miniHBaseCluster for hbase queries
+ // seperate the hbase queries from other normal hive queries,
+ // so we can just start the miniHBaseCluster during hbase querying.
+ qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", miniMR, $miniHBase);
#foreach ($qf in $qfiles)
qt.addFile("$qf.getCanonicalPath()");
Index: serde/src/java/org/apache/hadoop/hive/serde2/hbase/HBaseSerDe.java
===================================================================
--- serde/src/java/org/apache/hadoop/hive/serde2/hbase/HBaseSerDe.java (revision 0)
+++ serde/src/java/org/apache/hadoop/hive/serde2/hbase/HBaseSerDe.java (revision 0)
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyHBaseRow;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * HBaseSerDe can be used to serialize object into hbase table and
+ * deserialize object from hbase table.
+ */
+public class HBaseSerDe implements SerDe {
+
+ public static final Log LOG = LogFactory.getLog(
+ HBaseSerDe.class.getName());
+
+ public static class HBaseSerDeParameters {
+ List hbaseColumnNames;
+ SerDeParameters serdeParams;
+
+ public List getHBaseColumnNames() {
+ return hbaseColumnNames;
+ }
+
+ public SerDeParameters getSerDeParameters() {
+ return serdeParams;
+ }
+ }
+
+ private ObjectInspector mCachedObjectInspector;
+ HBaseSerDeParameters mHBaseSerDeParameters = null;
+ private boolean useJSONSerialize; // use json to serialize
+
+ public String toString() {
+ return getClass().toString()
+ + "["
+ + mHBaseSerDeParameters.hbaseColumnNames
+ + ":"
+ + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo())
+ .getAllStructFieldNames()
+ + ":"
+ + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo())
+ .getAllStructFieldTypeInfos() + "]";
+ }
+
+ public HBaseSerDe() throws SerDeException {
+ }
+
+ /**
+ * Initialize the SerDe given parameters.
+ * @see SerDe#initialize(Configuration, Properties)
+ */
+ public void initialize(Configuration conf, Properties tbl)
+ throws SerDeException {
+ mHBaseSerDeParameters = HBaseSerDe.initHBaseSerDeParameters(conf, tbl,
+ getClass().getName());
+
+ // We just used columnNames & columnTypes these two parameters
+ mCachedObjectInspector = LazyFactory.createLazyStructInspector(
+ mHBaseSerDeParameters.serdeParams.getColumnNames(),
+ mHBaseSerDeParameters.serdeParams.getColumnTypes(),
+ mHBaseSerDeParameters.serdeParams.getSeparators(),
+ mHBaseSerDeParameters.serdeParams.getNullSequence(),
+ mHBaseSerDeParameters.serdeParams.isLastColumnTakesRest(),
+ mHBaseSerDeParameters.serdeParams.isEscaped(),
+ mHBaseSerDeParameters.serdeParams.getEscapeChar());
+
+ mCachedHBaseRow = new LazyHBaseRow((LazySimpleStructObjectInspector)mCachedObjectInspector);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("HBaseSerDe initialized with : columnNames = "
+ + mHBaseSerDeParameters.serdeParams.getColumnNames() + " columnTypes = "
+ + mHBaseSerDeParameters.serdeParams.getColumnTypes() + " hbaseColumnMapping = "
+ + mHBaseSerDeParameters.hbaseColumnNames);
+ }
+ }
+
+ public static HBaseSerDeParameters initHBaseSerDeParameters(
+ Configuration job, Properties tbl, String serdeName)
+ throws SerDeException {
+ HBaseSerDeParameters serdeParams = new HBaseSerDeParameters();
+
+ // Read Configuration Parameter
+ String hbaseColumnNameProperty = tbl.getProperty("hbase.columns.mapping");
+ String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
+
+ // Initial the hbase column list
+ if (hbaseColumnNameProperty != null && hbaseColumnNameProperty.length() > 0) {
+ serdeParams.hbaseColumnNames = Arrays.asList(hbaseColumnNameProperty.split(","));
+ } else {
+ serdeParams.hbaseColumnNames = new ArrayList();
+ }
+
+ // Add the hbase key to the columnNameList and columnTypeList
+
+ // Build the type property string
+ if (columnTypeProperty == null) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(Constants.STRING_TYPE_NAME);
+
+ for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) {
+ String colName = serdeParams.hbaseColumnNames.get(i);
+ if(colName.endsWith(":"))
+ sb.append(":").append(Constants.MAP_TYPE_NAME + "<" +
+ Constants.STRING_TYPE_NAME + "," + Constants.STRING_TYPE_NAME + ">");
+ else
+ sb.append(":").append(Constants.STRING_TYPE_NAME);
+ }
+ tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString());
+ }
+
+ serdeParams.serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
+
+ if (serdeParams.hbaseColumnNames.size() + 1 != serdeParams.serdeParams.getColumnNames().size()) {
+ throw new SerDeException(serdeName + ": columns has " +
+ serdeParams.serdeParams.getColumnNames().size() +
+ " elements while hbase.columns.mapping has " +
+ serdeParams.hbaseColumnNames.size() + " elements!");
+ }
+
+ // check the mapping schema is right?
+ // we just can make sure that "columnfamily:" is mapped to MAP
+ for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) {
+ String hbaseColName = serdeParams.hbaseColumnNames.get(i);
+ if(hbaseColName.endsWith(":")) {
+ TypeInfo typeInfo = serdeParams.serdeParams.getColumnTypes().get(i+1);
+ if(typeInfo.getCategory() == Category.MAP &&
+ ((MapTypeInfo)typeInfo).getMapKeyTypeInfo().getTypeName() != Constants.STRING_TYPE_NAME) {
+ throw new SerDeException(serdeName + ": hbase column family '" +
+ hbaseColName + "' should be mapped to Map while being mapped to " +
+ ((MapTypeInfo)typeInfo).getMapKeyTypeInfo().getTypeName());
+ }
+ }
+ }
+
+ return serdeParams;
+ }
+
+ // The object for storing hbase row data.
+ LazyHBaseRow mCachedHBaseRow;
+
+ /**
+ * Deserialize a row from the HBase RowResult writable to a LazyObject
+ * @param rowResult the HBase RowResult Writable contain a row
+ * @return the deserialized object
+ * @see SerDe#deserialize(Writable)
+ */
+ public Object deserialize(Writable rowResult) throws SerDeException {
+
+ if (!(rowResult instanceof RowResult)) {
+ throw new SerDeException(getClass().getName() + ": expects RowResult!");
+ }
+
+ RowResult rr = (RowResult)rowResult;
+ mCachedHBaseRow.init(rr, mHBaseSerDeParameters.hbaseColumnNames);
+ return mCachedHBaseRow;
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return mCachedObjectInspector;
+ }
+
+ BatchUpdate serializeCache = null;
+ ByteStream.Output serializeStream = new ByteStream.Output();
+
+ @Override
+ public Class extends Writable> getSerializedClass() {
+ return BatchUpdate.class;
+ }
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector)
+ throws SerDeException {
+ if (objInspector.getCategory() != Category.STRUCT) {
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ + objInspector.getTypeName());
+ }
+
+ // Prepare the field ObjectInspectors
+ StructObjectInspector soi = (StructObjectInspector)objInspector;
+ List extends StructField> fields = soi.getAllStructFieldRefs();
+ List