diff --git lib/README lib/README
index 1c2f0b1..d6e148d 100644
--- lib/README
+++ lib/README
@@ -13,3 +13,4 @@ Folowing is the list of external jars contained in this directory and the source
* commons-collections-3.2.1.jar - http://commons.apache.org/downloads/download_collections.cgi
* commons-lang-2.4.jar - http://commons.apache.org/downloads/download_lang.cgi
* json.jar - http://www.json.org/java/index.html
+* javaewah.jar - compiled from https://code.google.com/p/javaewah/
diff --git lib/javaewah-0.2.jar lib/javaewah-0.2.jar
new file mode 100644
index 0000000..1f48093
Binary files /dev/null and lib/javaewah-0.2.jar differ
diff --git ql/build.xml ql/build.xml
index 50c604e..1f9941a 100644
--- ql/build.xml
+++ ql/build.xml
@@ -184,6 +184,12 @@
+
+
+
+
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index ba222f3..16a207e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -122,6 +122,7 @@ import org.apache.hadoop.hive.ql.udf.UDFUpper;
import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
import org.apache.hadoop.hive.ql.udf.UDFYear;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEWAHBitmap;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFContextNGrams;
@@ -146,6 +147,9 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFnGrams;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFArray;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayContains;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFEWAHBitmapAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFEWAHBitmapOr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFEWAHBitmapEmpty;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCoalesce;
@@ -339,6 +343,10 @@ public final class FunctionRegistry {
registerGenericUDF("not", GenericUDFOPNot.class);
registerGenericUDF("!", GenericUDFOPNot.class);
+ registerGenericUDF("ewah_bitmap_and", GenericUDFEWAHBitmapAnd.class);
+ registerGenericUDF("ewah_bitmap_or", GenericUDFEWAHBitmapOr.class);
+ registerGenericUDF("ewah_bitmap_empty", GenericUDFEWAHBitmapEmpty.class);
+
// Aliases for Java Class Names
// These are used in getImplicitConvertUDFMethod
@@ -384,6 +392,8 @@ public final class FunctionRegistry {
registerGenericUDAF("ngrams", new GenericUDAFnGrams());
registerGenericUDAF("context_ngrams", new GenericUDAFContextNGrams());
+ registerGenericUDAF("ewah_bitmap", new GenericUDAFEWAHBitmap());
+
registerUDAF("percentile", UDAFPercentile.class);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index ff74f08..96a1631 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -566,6 +566,17 @@ public class MapOperator extends Operator implements Serializable {
if (current != old.get()) {
old.set(current);
}
+ } else if (vc.equals(VirtualColumn.ROWOFFSET)) {
+ long current = ioCxt.getCurrentRow();
+ LongWritable old = (LongWritable) this.vcValues[i];
+ if (old == null) {
+ old = new LongWritable(current);
+ this.vcValues[i] = old;
+ continue;
+ }
+ if (current != old.get()) {
+ old.set(current);
+ }
}
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
index 308d985..591c9ff 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
@@ -30,7 +30,9 @@ public class HiveIndex {
public static String INDEX_TABLE_CREATETIME = "hive.index.basetbl.dfs.lastModifiedTime";
public static enum IndexType {
- COMPACT_SUMMARY_TABLE("compact", "org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler");
+ COMPACT_SUMMARY_TABLE("compact", "org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler"),
+ BITMAP_TABLE("bitmap",
+"org.apache.hadoop.hive.ql.index.bitmap.BitmapIndexHandler");
private IndexType(String indexType, String className) {
indexTypeName = indexType;
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
new file mode 100644
index 0000000..cbb3f0b
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+public class HiveIndexResult {
+
+ public static final Log l4j =
+ LogFactory.getLog(HiveIndexResult.class.getSimpleName());
+
+ // IndexBucket
+ static class IBucket {
+ private String name = null;
+ private SortedSet offsets = new TreeSet();
+
+ public IBucket(String n) {
+ name = n;
+ }
+
+ public void add(Long offset) {
+ offsets.add(offset);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public SortedSet getOffsets() {
+ return offsets;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj.getClass() != this.getClass()) {
+ return false;
+ }
+ return (((IBucket) obj).name.compareToIgnoreCase(this.name) == 0);
+ }
+ }
+
+ JobConf job = null;
+ BytesRefWritable[] bytesRef = new BytesRefWritable[2];
+ boolean ignoreHdfsLoc = false;
+
+ public HiveIndexResult(String indexFile, JobConf conf) throws IOException,
+ HiveException {
+ job = conf;
+
+ bytesRef[0] = new BytesRefWritable();
+ bytesRef[1] = new BytesRefWritable();
+ ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC);
+
+ if (indexFile != null) {
+ Path indexFilePath = new Path(indexFile);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus indexStat = fs.getFileStatus(indexFilePath);
+ List paths = new ArrayList();
+ if (indexStat.isDir()) {
+ FileStatus[] fss = fs.listStatus(indexFilePath);
+ for (FileStatus f : fss) {
+ paths.add(f.getPath());
+ }
+ } else {
+ paths.add(indexFilePath);
+ }
+
+ for (Path indexFinalPath : paths) {
+ FSDataInputStream ifile = fs.open(indexFinalPath);
+ LineReader lr = new LineReader(ifile, conf);
+ try {
+ Text line = new Text();
+ while (lr.readLine(line) > 0) {
+ add(line);
+ }
+ }
+ finally {
+ // this will close the input stream
+ lr.close();
+ }
+ }
+ }
+ }
+
+ Map buckets = new HashMap();
+
+ private void add(Text line) throws HiveException {
+ String l = line.toString();
+ byte[] bytes = l.getBytes();
+ int firstEnd = 0;
+ int i = 0;
+ for (int index = 0; index < bytes.length; index++) {
+ if (bytes[index] == LazySimpleSerDe.DefaultSeparators[0]) {
+ i++;
+ firstEnd = index;
+ }
+ }
+ if (i > 1) {
+ throw new HiveException(
+ "Bad index file row (index file should only contain two columns: bucket_file_name and offset lists.) ."
+ + line.toString());
+ }
+ String bucketFileName = new String(bytes, 0, firstEnd);
+
+ if (ignoreHdfsLoc) {
+ Path tmpPath = new Path(bucketFileName);
+ bucketFileName = tmpPath.toUri().getPath();
+ }
+ IBucket bucket = buckets.get(bucketFileName);
+ if (bucket == null) {
+ bucket = new IBucket(bucketFileName);
+ buckets.put(bucketFileName, bucket);
+ }
+
+ int currentStart = firstEnd + 1;
+ int currentEnd = firstEnd + 1;
+ for (; currentEnd < bytes.length; currentEnd++) {
+ if (bytes[currentEnd] == LazySimpleSerDe.DefaultSeparators[1]) {
+ String one_offset = new String(bytes, currentStart, currentEnd
+ - currentStart);
+ Long offset = Long.parseLong(one_offset);
+ bucket.getOffsets().add(offset);
+ currentStart = currentEnd + 1;
+ }
+ }
+ String one_offset = new String(bytes, currentStart, currentEnd
+ - currentStart);
+ bucket.getOffsets().add(Long.parseLong(one_offset));
+ }
+
+ public boolean contains(FileSplit split) throws HiveException {
+
+ if (buckets == null) {
+ return false;
+ }
+ String bucketName = split.getPath().toString();
+ IBucket bucket = buckets.get(bucketName);
+ if (bucket == null) {
+ bucketName = split.getPath().toUri().getPath();
+ bucket = buckets.get(bucketName);
+ if (bucket == null) {
+ return false;
+ }
+ }
+
+ for (Long offset : bucket.getOffsets()) {
+ if ((offset >= split.getStart())
+ && (offset <= split.getStart() + split.getLength())) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java
new file mode 100644
index 0000000..bdef698
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.IOPrepareCache;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HiveIndexedInputFormat extends HiveInputFormat {
+ public static final Log l4j = LogFactory.getLog("HiveIndexInputFormat");
+ protected final String indexFile = "hive.index.indextablefile";
+
+ public HiveIndexedInputFormat() {
+ super();
+ }
+
+ public InputSplit[] doGetSplits(JobConf job, int numSplits) throws IOException {
+
+ super.init(job);
+
+ Path[] dirs = FileInputFormat.getInputPaths(job);
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+ JobConf newjob = new JobConf(job);
+ ArrayList result = new ArrayList();
+
+ // for each dir, get the InputFormat, and do getSplits.
+ PartitionDesc part;
+ for (Path dir : dirs) {
+ part = HiveFileFormatUtils
+ .getPartitionDescFromPathRecursively(pathToPartitionInfo, dir,
+ IOPrepareCache.get().allocatePartitionDescMap(), true);
+ // create a new InputFormat instance if this is the first time to see this
+ // class
+ Class inputFormatClass = part.getInputFileFormatClass();
+ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+ Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob);
+
+ FileInputFormat.setInputPaths(newjob, dir);
+ newjob.setInputFormat(inputFormat.getClass());
+ InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
+ for (InputSplit is : iss) {
+ result.add(new HiveInputSplit(is, inputFormatClass.getName()));
+ }
+ }
+ return result.toArray(new HiveInputSplit[result.size()]);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ String indexFileStr = job.get(indexFile);
+ l4j.info("index_file is " + indexFileStr);
+
+ HiveIndexResult hiveIndexResult = null;
+ if (indexFileStr != null) {
+ try {
+ hiveIndexResult = new HiveIndexResult(indexFileStr, job);
+ } catch (HiveException e) {
+ l4j.error("Unable to read index..");
+ throw new IOException(e);
+ }
+
+ Set inputFiles = hiveIndexResult.buckets.keySet();
+ Iterator iter = inputFiles.iterator();
+ boolean first = true;
+ StringBuilder newInputPaths = new StringBuilder();
+ while(iter.hasNext()) {
+ String path = iter.next();
+ if (path.trim().equalsIgnoreCase("")) {
+ continue;
+ }
+ if (!first) {
+ newInputPaths.append(",");
+ } else {
+ first = false;
+ }
+ newInputPaths.append(path);
+ }
+
+ FileInputFormat.setInputPaths(job, newInputPaths.toString());
+ } else {
+ return super.getSplits(job, numSplits);
+ }
+
+ HiveInputSplit[] splits = (HiveInputSplit[]) this.doGetSplits(job, numSplits);
+
+ ArrayList newSplits = new ArrayList(
+ numSplits);
+ for (HiveInputSplit split : splits) {
+ l4j.info("split start : " + split.getStart());
+ l4j.info("split end : " + (split.getStart() + split.getLength()));
+
+ try {
+ if (hiveIndexResult.contains(split)) {
+ // we may miss a sync here
+ HiveInputSplit newSplit = split;
+ if (split.inputFormatClassName().contains("RCFile")
+ || split.inputFormatClassName().contains("SequenceFile")) {
+ if (split.getStart() > SequenceFile.SYNC_INTERVAL) {
+ newSplit = new HiveInputSplit(new FileSplit(split.getPath(), split
+ .getStart()
+ - SequenceFile.SYNC_INTERVAL, split.getLength()
+ + SequenceFile.SYNC_INTERVAL, split.getLocations()), split
+ .inputFormatClassName());
+ }
+ }
+ newSplits.add(newSplit);
+ }
+ } catch (HiveException e) {
+ throw new RuntimeException(
+ "Unable to get metadata for input table split" + split.getPath());
+ }
+ }
+ InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()]));
+ l4j.info("Number of input splits: " + splits.length + " new input splits: "
+ + retA.length);
+ return retA;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexMetadataChangeTask.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexMetadataChangeTask.java
new file mode 100644
index 0000000..b30a1f1
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexMetadataChangeTask.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+public class IndexMetadataChangeTask extends Task{
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected int execute(DriverContext driverContext) {
+
+ try {
+ Hive db = Hive.get(conf);
+ IndexMetadataChangeWork work = this.getWork();
+ String tblName = work.getIndexTbl();
+ Table tbl = db.getTable(work.getDbName(), tblName);
+ if (tbl == null ) {
+ console.printError("Index table can not be null.");
+ return 1;
+ }
+
+ if (!tbl.getTableType().equals(TableType.INDEX_TABLE)) {
+ console.printError("Table " + tbl.getTableName() + " not specified.");
+ return 1;
+ }
+
+ if (tbl.isPartitioned() && work.getPartSpec() == null) {
+ console.printError("Index table is partitioned, but no partition specified.");
+ return 1;
+ }
+
+ if (work.getPartSpec() != null) {
+ Partition part = db.getPartition(tbl, work.getPartSpec(), false);
+ if (part == null) {
+ console.printError("Partition " +
+ Warehouse.makePartName(work.getPartSpec(), false).toString()
+ + " does not exist.");
+ return 1;
+ }
+
+ Path url = new Path(part.getDataLocation().toString());
+ FileSystem fs = url.getFileSystem(conf);
+ FileStatus fstat = fs.getFileStatus(url);
+
+ part.getParameters().put(HiveIndex.INDEX_TABLE_CREATETIME, Long.toString(fstat.getModificationTime()));
+ db.alterPartition(tbl.getTableName(), part);
+ } else {
+ Path url = new Path(tbl.getDataLocation().toString());
+ FileSystem fs = url.getFileSystem(conf);
+ FileStatus fstat = fs.getFileStatus(url);
+ tbl.getParameters().put(HiveIndex.INDEX_TABLE_CREATETIME, Long.toString(fstat.getModificationTime()));
+ db.alterTable(tbl.getTableName(), tbl);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ console.printError("Error changing index table/partition metadata "
+ + e.getMessage());
+ return 1;
+ }
+ return 0;
+ }
+
+ @Override
+ public String getName() {
+ return IndexMetadataChangeTask.class.getSimpleName();
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.DDL;
+ }
+
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexMetadataChangeWork.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexMetadataChangeWork.java
new file mode 100644
index 0000000..59a9bd6
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexMetadataChangeWork.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class IndexMetadataChangeWork implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private HashMap partSpec;
+ private String indexTbl;
+ private String dbName;
+
+ public IndexMetadataChangeWork() {
+ }
+
+ public IndexMetadataChangeWork(HashMap partSpec,
+ String indexTbl, String dbName) {
+ super();
+ this.partSpec = partSpec;
+ this.indexTbl = indexTbl;
+ this.dbName = dbName;
+ }
+
+ public HashMap getPartSpec() {
+ return partSpec;
+ }
+
+ public void setPartSpec(HashMap partSpec) {
+ this.partSpec = partSpec;
+ }
+
+ public String getIndexTbl() {
+ return indexTbl;
+ }
+
+ public void setIndexTbl(String indexTbl) {
+ this.indexTbl = indexTbl;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
new file mode 100644
index 0000000..12f37ce
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
+ protected Configuration configuration;
+
+ @Override
+ public List> generateIndexBuildTaskList(
+ org.apache.hadoop.hive.ql.metadata.Table baseTbl,
+ org.apache.hadoop.hive.metastore.api.Index index,
+ List indexTblPartitions, List baseTblPartitions,
+ org.apache.hadoop.hive.ql.metadata.Table indexTbl,
+ Set inputs, Set outputs) throws HiveException {
+ try {
+
+ TableDesc desc = Utilities.getTableDesc(indexTbl);
+
+ List newBaseTblPartitions = new ArrayList();
+
+ List> indexBuilderTasks = new ArrayList>();
+
+ if (!baseTbl.isPartitioned()) {
+ // the table does not have any partition, then create index for the
+ // whole table
+ Task> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), false,
+ new PartitionDesc(desc, null), indexTbl.getTableName(),
+ new PartitionDesc(Utilities.getTableDesc(baseTbl), null),
+ baseTbl.getTableName(), indexTbl.getDbName());
+ indexBuilderTasks.add(indexBuilder);
+ } else {
+
+ // check whether the index table partitions are still exists in base
+ // table
+ for (int i = 0; i < indexTblPartitions.size(); i++) {
+ Partition indexPart = indexTblPartitions.get(i);
+ Partition basePart = null;
+ for (int j = 0; j < baseTblPartitions.size(); j++) {
+ if (baseTblPartitions.get(j).getName().equals(indexPart.getName())) {
+ basePart = baseTblPartitions.get(j);
+ newBaseTblPartitions.add(baseTblPartitions.get(j));
+ break;
+ }
+ }
+ if (basePart == null) {
+ throw new RuntimeException(
+ "Partitions of base table and index table are inconsistent.");
+ }
+ // for each partition, spawn a map reduce task.
+ Task> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), true,
+ new PartitionDesc(indexPart), indexTbl.getTableName(),
+ new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName());
+ indexBuilderTasks.add(indexBuilder);
+ }
+ }
+ return indexBuilderTasks;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ abstract protected Task> getIndexBuilderMapRedTask(Set inputs, Set outputs,
+ List indexField, boolean partitioned,
+ PartitionDesc indexTblPartDesc, String indexTableName,
+ PartitionDesc baseTablePartDesc, String baseTableName, String dbName);
+
+ protected List getPartKVPairStringArray(
+ LinkedHashMap partSpec) {
+ List ret = new ArrayList(partSpec.size());
+ Iterator> iter = partSpec.entrySet().iterator();
+ while (iter.hasNext()) {
+ StringBuilder sb = new StringBuilder();
+ Entry p = iter.next();
+ sb.append(HiveUtils.unparseIdentifier(p.getKey()));
+ sb.append(" = ");
+ sb.append("'");
+ sb.append(HiveUtils.escapeString(p.getValue()));
+ sb.append("'");
+ ret.add(sb.toString());
+ }
+ return ret;
+ }
+
+ @Override
+ public boolean usesIndexTable() {
+ return true;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.configuration = conf;
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
new file mode 100644
index 0000000..23e925c
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index.bitmap;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
+import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
+
+public class BitmapIndexHandler extends TableBasedIndexHandler {
+
+ @Override
+ public void analyzeIndexDefinition(Table baseTable, Index index,
+ Table indexTable) throws HiveException {
+ StorageDescriptor storageDesc = index.getSd();
+ if (this.usesIndexTable() && indexTable != null) {
+ StorageDescriptor indexTableSd = storageDesc.deepCopy();
+ List indexTblCols = indexTableSd.getCols();
+ FieldSchema bucketFileName = new FieldSchema("_bucketname", "string", "");
+ indexTblCols.add(bucketFileName);
+ FieldSchema offSets = new FieldSchema("_offset", "bigint", "");
+ indexTblCols.add(offSets);
+ FieldSchema bitmaps = new FieldSchema("_bitmaps", "array", "");
+ indexTblCols.add(bitmaps);
+ indexTable.setSd(indexTableSd);
+ }
+ }
+
+ @Override
+ protected Task> getIndexBuilderMapRedTask(Set inputs, Set outputs,
+ List indexField, boolean partitioned,
+ PartitionDesc indexTblPartDesc, String indexTableName,
+ PartitionDesc baseTablePartDesc, String baseTableName, String dbName) {
+
+ String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
+
+ //form a new insert overwrite query.
+ StringBuilder command= new StringBuilder();
+ LinkedHashMap partSpec = indexTblPartDesc.getPartSpec();
+
+ command.append("INSERT OVERWRITE TABLE " + HiveUtils.unparseIdentifier(indexTableName ));
+ if (partitioned && indexTblPartDesc != null) {
+ command.append(" PARTITION ( ");
+ List ret = getPartKVPairStringArray(partSpec);
+ for (int i = 0; i < ret.size(); i++) {
+ String partKV = ret.get(i);
+ command.append(partKV);
+ if (i < ret.size() - 1) {
+ command.append(",");
+ }
+ }
+ command.append(" ) ");
+ }
+
+ command.append(" SELECT ");
+ command.append(indexCols);
+ command.append(",");
+ command.append(VirtualColumn.FILENAME.getName());
+ command.append(",");
+ command.append(VirtualColumn.BLOCKOFFSET.getName());
+ command.append(",");
+ command.append("EWAH_BITMAP(");
+ command.append(VirtualColumn.ROWOFFSET.getName());
+ command.append(")");
+ command.append(" FROM " + HiveUtils.unparseIdentifier(baseTableName) );
+ LinkedHashMap basePartSpec = baseTablePartDesc.getPartSpec();
+ if(basePartSpec != null) {
+ command.append(" WHERE ");
+ List pkv = getPartKVPairStringArray(basePartSpec);
+ for (int i = 0; i < pkv.size(); i++) {
+ String partKV = pkv.get(i);
+ command.append(partKV);
+ if (i < pkv.size() - 1) {
+ command.append(" AND ");
+ }
+ }
+ }
+ command.append(" GROUP BY ");
+ command.append(VirtualColumn.FILENAME.getName());
+ command.append(",");
+ command.append(VirtualColumn.BLOCKOFFSET.getName());
+ for (FieldSchema fieldSchema : indexField) {
+ command.append(",");
+ command.append(HiveUtils.unparseIdentifier(fieldSchema.getName()));
+ }
+
+ // Require clusterby ROWOFFSET if map-size aggregation is off.
+ if (!configuration.get("hive.map.aggr", null).equals("true")) {
+ command.append(" CLUSTER BY ");
+ command.append(VirtualColumn.ROWOFFSET.getName());
+ }
+
+ Driver driver = new Driver(new HiveConf(getConf(), BitmapIndexHandler.class));
+ driver.compile(command.toString());
+
+ Task> rootTask = driver.getPlan().getRootTasks().get(0);
+ inputs.addAll(driver.getPlan().getInputs());
+ outputs.addAll(driver.getPlan().getOutputs());
+
+ IndexMetadataChangeWork indexMetaChange = new IndexMetadataChangeWork(partSpec, indexTableName, dbName);
+ IndexMetadataChangeTask indexMetaChangeTsk = new IndexMetadataChangeTask();
+ indexMetaChangeTsk.setWork(indexMetaChange);
+ rootTask.addDependentTask(indexMetaChangeTsk);
+
+ return rootTask;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapObjectInput.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapObjectInput.java
new file mode 100644
index 0000000..33213b4
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapObjectInput.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index.bitmap;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyLong;
+
+public class BitmapObjectInput implements ObjectInput {
+ Iterator bufferIter;
+ List buffer;
+
+ public BitmapObjectInput() {
+ buffer = new ArrayList();
+ bufferIter = buffer.iterator();
+ }
+
+ public BitmapObjectInput(List l) {
+ readFromList(l);
+ }
+
+ public void readFromList(List l) {
+ buffer = l;
+ bufferIter = buffer.iterator();
+ }
+
+ @Override
+ public int available() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int read(byte[] arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int read(byte[] arg0, int arg1, int arg2) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object readObject() throws ClassNotFoundException, IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long skip(long arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFully(byte[] arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFully(byte[] arg0, int arg1, int arg2) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ if (bufferIter.hasNext()) {
+ LongObjectInspector loi = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ Long l = PrimitiveObjectInspectorUtils.getLong(bufferIter.next(), loi);
+ return l.intValue();
+ //return bufferIter.next().intValue();
+ }
+ else {
+ throw new IOException();
+ }
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ //LongObjectInspector loi = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ if (bufferIter.hasNext()) {
+ LongObjectInspector loi = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ return PrimitiveObjectInspectorUtils.getLong(bufferIter.next(), loi);
+ //return bufferIter.next();
+ }
+ else {
+ throw new IOException();
+ }
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapObjectOutput.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapObjectOutput.java
new file mode 100644
index 0000000..58d7563
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapObjectOutput.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index.bitmap;
+
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+public class BitmapObjectOutput implements ObjectOutput {
+ ArrayList buffer = new ArrayList();
+
+ public List list() {
+ return buffer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(int arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(byte[] arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(byte[] arg0, int arg1, int arg2) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeObject(Object arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeBoolean(boolean arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeByte(int arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeBytes(String arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeChar(int arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeChars(String arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ buffer.add(new LongWritable(v));
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ buffer.add(new LongWritable(v));
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
index 1f01446..f90d64f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.index.compact;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -28,7 +27,6 @@ import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -38,8 +36,9 @@ import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.index.AbstractIndexHandler;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
+import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
+import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -48,8 +47,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-public class CompactIndexHandler extends AbstractIndexHandler {
-
+public class CompactIndexHandler extends TableBasedIndexHandler {
+
private Configuration configuration;
@Override
@@ -68,63 +67,11 @@ public class CompactIndexHandler extends AbstractIndexHandler {
}
@Override
- public List> generateIndexBuildTaskList(
- org.apache.hadoop.hive.ql.metadata.Table baseTbl,
- org.apache.hadoop.hive.metastore.api.Index index,
- List indexTblPartitions, List baseTblPartitions,
- org.apache.hadoop.hive.ql.metadata.Table indexTbl,
- Set inputs, Set outputs) throws HiveException {
- try {
-
- TableDesc desc = Utilities.getTableDesc(indexTbl);
-
- List newBaseTblPartitions = new ArrayList();
-
- List> indexBuilderTasks = new ArrayList>();
-
- if (!baseTbl.isPartitioned()) {
- // the table does not have any partition, then create index for the
- // whole table
- Task> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), false,
- new PartitionDesc(desc, null), indexTbl.getTableName(),
- new PartitionDesc(Utilities.getTableDesc(baseTbl), null),
- baseTbl.getTableName(), indexTbl.getDbName());
- indexBuilderTasks.add(indexBuilder);
- } else {
-
- // check whether the index table partitions are still exists in base
- // table
- for (int i = 0; i < indexTblPartitions.size(); i++) {
- Partition indexPart = indexTblPartitions.get(i);
- Partition basePart = null;
- for (int j = 0; j < baseTblPartitions.size(); j++) {
- if (baseTblPartitions.get(j).getName().equals(indexPart.getName())) {
- basePart = baseTblPartitions.get(j);
- newBaseTblPartitions.add(baseTblPartitions.get(j));
- break;
- }
- }
- if (basePart == null)
- throw new RuntimeException(
- "Partitions of base table and index table are inconsistent.");
- // for each partition, spawn a map reduce task.
- Task> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), true,
- new PartitionDesc(indexPart), indexTbl.getTableName(),
- new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName());
- indexBuilderTasks.add(indexBuilder);
- }
- }
- return indexBuilderTasks;
- } catch (Exception e) {
- throw new SemanticException(e);
- }
- }
-
- private Task> getIndexBuilderMapRedTask(Set inputs, Set outputs,
+ protected Task> getIndexBuilderMapRedTask(Set inputs, Set outputs,
List indexField, boolean partitioned,
PartitionDesc indexTblPartDesc, String indexTableName,
PartitionDesc baseTablePartDesc, String baseTableName, String dbName) {
-
+
String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
//form a new insert overwrite query.
@@ -138,12 +85,13 @@ public class CompactIndexHandler extends AbstractIndexHandler {
for (int i = 0; i < ret.size(); i++) {
String partKV = ret.get(i);
command.append(partKV);
- if (i < ret.size() - 1)
+ if (i < ret.size() - 1) {
command.append(",");
+ }
}
command.append(" ) ");
}
-
+
command.append(" SELECT ");
command.append(indexCols);
command.append(",");
@@ -161,8 +109,9 @@ public class CompactIndexHandler extends AbstractIndexHandler {
for (int i = 0; i < pkv.size(); i++) {
String partKV = pkv.get(i);
command.append(partKV);
- if (i < pkv.size() - 1)
+ if (i < pkv.size() - 1) {
command.append(" AND ");
+ }
}
}
command.append(" GROUP BY ");
@@ -174,45 +123,12 @@ public class CompactIndexHandler extends AbstractIndexHandler {
Task> rootTask = driver.getPlan().getRootTasks().get(0);
inputs.addAll(driver.getPlan().getInputs());
outputs.addAll(driver.getPlan().getOutputs());
-
+
IndexMetadataChangeWork indexMetaChange = new IndexMetadataChangeWork(partSpec, indexTableName, dbName);
- IndexMetadataChangeTask indexMetaChangeTsk = new IndexMetadataChangeTask();
+ IndexMetadataChangeTask indexMetaChangeTsk = new IndexMetadataChangeTask();
indexMetaChangeTsk.setWork(indexMetaChange);
rootTask.addDependentTask(indexMetaChangeTsk);
return rootTask;
}
-
- private List getPartKVPairStringArray(
- LinkedHashMap partSpec) {
- List ret = new ArrayList(partSpec.size());
- Iterator> iter = partSpec.entrySet().iterator();
- while (iter.hasNext()) {
- StringBuilder sb = new StringBuilder();
- Entry p = iter.next();
- sb.append(HiveUtils.unparseIdentifier(p.getKey()));
- sb.append(" = ");
- sb.append("'");
- sb.append(p.getValue());
- sb.append("'");
- ret.add(sb.toString());
- }
- return ret;
- }
-
- @Override
- public boolean usesIndexTable() {
- return true;
- }
-
- @Override
- public Configuration getConf() {
- return configuration;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.configuration = conf;
- }
-
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java
index 6c320c5..bcbafcc 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java
@@ -15,136 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hive.ql.index.compact;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
+package org.apache.hadoop.hive.ql.index.compact;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.io.IOPrepareCache;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.index.HiveIndexedInputFormat;
-public class HiveCompactIndexInputFormat extends HiveInputFormat {
+public class HiveCompactIndexInputFormat extends HiveIndexedInputFormat {
- public static final Log l4j = LogFactory.getLog("HiveIndexInputFormat");
+ public static final Log l4j =
+ LogFactory.getLog(HiveCompactIndexInputFormat.class.getSimpleName());
+ public final String indexFile = "hive.index.compact.file";
public HiveCompactIndexInputFormat() {
super();
}
-
- public InputSplit[] doGetSplits(JobConf job, int numSplits) throws IOException {
-
- super.init(job);
-
- Path[] dirs = FileInputFormat.getInputPaths(job);
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
- JobConf newjob = new JobConf(job);
- ArrayList result = new ArrayList();
-
- // for each dir, get the InputFormat, and do getSplits.
- for (Path dir : dirs) {
- PartitionDesc part = HiveFileFormatUtils
- .getPartitionDescFromPathRecursively(pathToPartitionInfo, dir,
- IOPrepareCache.get().allocatePartitionDescMap(), true);
- // create a new InputFormat instance if this is the first time to see this
- // class
- Class inputFormatClass = part.getInputFileFormatClass();
- InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
- Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob);
-
- FileInputFormat.setInputPaths(newjob, dir);
- newjob.setInputFormat(inputFormat.getClass());
- InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
- for (InputSplit is : iss) {
- result.add(new HiveInputSplit(is, inputFormatClass.getName()));
- }
- }
- return result.toArray(new HiveInputSplit[result.size()]);
- }
-
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- String indexFileStr = job.get("hive.index.compact.file");
- l4j.info("index_file is " + indexFileStr);
-
- HiveCompactIndexResult hiveIndexResult = null;
- if (indexFileStr != null) {
- try {
- hiveIndexResult = new HiveCompactIndexResult(indexFileStr, job);
- } catch (HiveException e) {
- l4j.error("Unable to read index..");
- throw new IOException(e);
- }
-
- Set inputFiles = hiveIndexResult.buckets.keySet();
- Iterator iter = inputFiles.iterator();
- boolean first = true;
- StringBuilder newInputPaths = new StringBuilder();
- while(iter.hasNext()) {
- String path = iter.next();
- if (path.trim().equalsIgnoreCase(""))
- continue;
- if (!first) {
- newInputPaths.append(",");
- } else {
- first = false;
- }
- newInputPaths.append(path);
- }
-
- FileInputFormat.setInputPaths(job, newInputPaths.toString());
- } else {
- return super.getSplits(job, numSplits);
- }
-
- HiveInputSplit[] splits = (HiveInputSplit[]) this.doGetSplits(job, numSplits);
-
- ArrayList newSplits = new ArrayList(
- numSplits);
- for (HiveInputSplit split : splits) {
- l4j.info("split start : " + split.getStart());
- l4j.info("split end : " + (split.getStart() + split.getLength()));
-
- try {
- if (hiveIndexResult.contains(split)) {
- // we may miss a sync here
- HiveInputSplit newSplit = split;
- if (split.inputFormatClassName().contains("RCFile")
- || split.inputFormatClassName().contains("SequenceFile")) {
- if (split.getStart() > SequenceFile.SYNC_INTERVAL) {
- newSplit = new HiveInputSplit(new FileSplit(split.getPath(), split
- .getStart()
- - SequenceFile.SYNC_INTERVAL, split.getLength()
- + SequenceFile.SYNC_INTERVAL, split.getLocations()), split
- .inputFormatClassName());
- }
- }
- newSplits.add(newSplit);
- }
- } catch (HiveException e) {
- throw new RuntimeException(
- "Unable to get metadata for input table split" + split.getPath());
- }
- }
- InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()]));
- l4j.info("Number of input splits: " + splits.length + " new input splits: "
- + retA.length);
- return retA;
- }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexResult.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexResult.java
deleted file mode 100644
index 0c9ccea..0000000
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexResult.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.index.compact;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.LineRecordReader.LineReader;
-
-public class HiveCompactIndexResult {
-
- public static final Log l4j = LogFactory.getLog("HiveCompactIndexResult");
-
- // IndexBucket
- static class IBucket {
- private String name = null;
- private SortedSet offsets = new TreeSet();
-
- public IBucket(String n) {
- name = n;
- }
-
- public void add(Long offset) {
- offsets.add(offset);
- }
-
- public String getName() {
- return name;
- }
-
- public SortedSet getOffsets() {
- return offsets;
- }
-
- public boolean equals(Object obj) {
- if (obj.getClass() != this.getClass()) {
- return false;
- }
- return (((IBucket) obj).name.compareToIgnoreCase(this.name) == 0);
- }
- }
-
- JobConf job = null;
- BytesRefWritable[] bytesRef = new BytesRefWritable[2];
- boolean ignoreHdfsLoc = false;
-
- public HiveCompactIndexResult(String indexFile, JobConf conf) throws IOException,
- HiveException {
- job = conf;
-
- bytesRef[0] = new BytesRefWritable();
- bytesRef[1] = new BytesRefWritable();
- ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC);
-
- if (indexFile != null) {
- Path indexFilePath = new Path(indexFile);
- FileSystem fs = FileSystem.get(conf);
- FileStatus indexStat = fs.getFileStatus(indexFilePath);
- List paths = new ArrayList();
- if (indexStat.isDir()) {
- FileStatus[] fss = fs.listStatus(indexFilePath);
- for (FileStatus f : fss) {
- paths.add(f.getPath());
- }
- } else {
- paths.add(indexFilePath);
- }
-
- for (Path indexFinalPath : paths) {
- FSDataInputStream ifile = fs.open(indexFinalPath);
- LineReader lr = new LineReader(ifile, conf);
- Text line = new Text();
- while (lr.readLine(line) > 0) {
- add(line);
- }
- // this will close the input stream
- lr.close();
- }
- }
- }
-
- Map buckets = new HashMap();
-
- private void add(Text line) throws HiveException {
- String l = line.toString();
- byte[] bytes = l.getBytes();
- int firstEnd = 0;
- int i = 0;
- for (int index = 0; index < bytes.length; index++) {
- if (bytes[index] == LazySimpleSerDe.DefaultSeparators[0]) {
- i++;
- firstEnd = index;
- }
- }
- if (i > 1) {
- throw new HiveException(
- "Bad index file row (index file should only contain two columns: bucket_file_name and offset lists.) ."
- + line.toString());
- }
- String bucketFileName = new String(bytes, 0, firstEnd);
-
- if (ignoreHdfsLoc) {
- Path tmpPath = new Path(bucketFileName);
- bucketFileName = tmpPath.toUri().getPath();
- }
- IBucket bucket = buckets.get(bucketFileName);
- if (bucket == null) {
- bucket = new IBucket(bucketFileName);
- buckets.put(bucketFileName, bucket);
- }
-
- int currentStart = firstEnd + 1;
- int currentEnd = firstEnd + 1;
- for (; currentEnd < bytes.length; currentEnd++) {
- if (bytes[currentEnd] == LazySimpleSerDe.DefaultSeparators[1]) {
- String one_offset = new String(bytes, currentStart, currentEnd
- - currentStart);
- Long offset = Long.parseLong(one_offset);
- bucket.getOffsets().add(offset);
- currentStart = currentEnd + 1;
- }
- }
- String one_offset = new String(bytes, currentStart, currentEnd
- - currentStart);
- bucket.getOffsets().add(Long.parseLong(one_offset));
- }
-
- public boolean contains(FileSplit split) throws HiveException {
-
- if (buckets == null) {
- return false;
- }
- String bucketName = split.getPath().toString();
- IBucket bucket = buckets.get(bucketName);
- if (bucket == null) {
- bucketName = split.getPath().toUri().getPath();
- bucket = buckets.get(bucketName);
- if (bucket == null) {
- return false;
- }
- }
-
- for (Long offset : bucket.getOffsets()) {
- if ((offset >= split.getStart())
- && (offset <= split.getStart() + split.getLength())) {
- return true;
- }
- }
- return false;
- }
-}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeTask.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeTask.java
deleted file mode 100644
index eac168f..0000000
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeTask.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.index.compact;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.index.HiveIndex;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-
-public class IndexMetadataChangeTask extends Task{
-
- private static final long serialVersionUID = 1L;
-
- @Override
- protected int execute(DriverContext driverContext) {
-
- try {
- Hive db = Hive.get(conf);
- IndexMetadataChangeWork work = this.getWork();
- String tblName = work.getIndexTbl();
- Table tbl = db.getTable(work.getDbName(), tblName);
- if (tbl == null ) {
- console.printError("Index table can not be null.");
- return 1;
- }
-
- if (!tbl.getTableType().equals(TableType.INDEX_TABLE)) {
- console.printError("Table " + tbl.getTableName() + " not specified.");
- return 1;
- }
-
- if (tbl.isPartitioned() && work.getPartSpec() == null) {
- console.printError("Index table is partitioned, but no partition specified.");
- return 1;
- }
-
- if (work.getPartSpec() != null) {
- Partition part = db.getPartition(tbl, work.getPartSpec(), false);
- if (part == null) {
- console.printError("Partition " +
- Warehouse.makePartName(work.getPartSpec(), false).toString()
- + " does not exist.");
- return 1;
- }
-
- Path url = new Path(part.getDataLocation().toString());
- FileSystem fs = url.getFileSystem(conf);
- FileStatus fstat = fs.getFileStatus(url);
-
- part.getParameters().put(HiveIndex.INDEX_TABLE_CREATETIME, Long.toString(fstat.getModificationTime()));
- db.alterPartition(tbl.getTableName(), part);
- } else {
- Path url = new Path(tbl.getDataLocation().toString());
- FileSystem fs = url.getFileSystem(conf);
- FileStatus fstat = fs.getFileStatus(url);
- tbl.getParameters().put(HiveIndex.INDEX_TABLE_CREATETIME, Long.toString(fstat.getModificationTime()));
- db.alterTable(tbl.getTableName(), tbl);
- }
- } catch (Exception e) {
- e.printStackTrace();
- console.printError("Error changing index table/partition metadata "
- + e.getMessage());
- return 1;
- }
- return 0;
- }
-
- @Override
- public String getName() {
- return "IndexMetadataChangeTask";
- }
-
- @Override
- public StageType getType() {
- return StageType.DDL;
- }
-
- @Override
- protected void localizeMRTmpFilesImpl(Context ctx) {
- }
-
-}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeWork.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeWork.java
deleted file mode 100644
index 26beb4e..0000000
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeWork.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.index.compact;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
-public class IndexMetadataChangeWork implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private HashMap partSpec;
- private String indexTbl;
- private String dbName;
-
- public IndexMetadataChangeWork() {
- }
-
- public IndexMetadataChangeWork(HashMap partSpec,
- String indexTbl, String dbName) {
- super();
- this.partSpec = partSpec;
- this.indexTbl = indexTbl;
- this.dbName = dbName;
- }
-
- public HashMap getPartSpec() {
- return partSpec;
- }
-
- public void setPartSpec(HashMap partSpec) {
- this.partSpec = partSpec;
- }
-
- public String getIndexTbl() {
- return indexTbl;
- }
-
- public void setIndexTbl(String indexTbl) {
- this.indexTbl = indexTbl;
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public void setDbName(String dbName) {
- this.dbName = dbName;
- }
-
-}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 391e5de..6abc6cd 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -76,16 +76,26 @@ public abstract class HiveContextAwareRecordReader implements RecordReader
long pointerPos = this.getPos();
if (!ioCxtRef.isBlockPointer) {
ioCxtRef.currentBlockStart = pointerPos;
+ ioCxtRef.currentRow = 0;
return;
}
+ ioCxtRef.currentRow++;
+
if (ioCxtRef.nextBlockStart == -1) {
ioCxtRef.nextBlockStart = pointerPos;
+ ioCxtRef.currentRow = 0;
}
if (pointerPos != ioCxtRef.nextBlockStart) {
// the reader pointer has moved to the end of next block, or the end of
// current record.
+ ioCxtRef.currentRow = 0;
+
+ if (ioCxtRef.currentBlockStart == ioCxtRef.nextBlockStart) {
+ ioCxtRef.currentRow = 1;
+ }
+
ioCxtRef.currentBlockStart = ioCxtRef.nextBlockStart;
ioCxtRef.nextBlockStart = pointerPos;
}
@@ -133,4 +143,4 @@ public abstract class HiveContextAwareRecordReader implements RecordReader
}
this.initIOContext(blockStart, blockPointer, split.getPath().toString());
}
-}
\ No newline at end of file
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
index 77220a1..8091970 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
@@ -41,6 +41,7 @@ public class IOContext {
long currentBlockStart;
long nextBlockStart;
+ long currentRow;
boolean isBlockPointer;
boolean ioExceptions;
@@ -49,6 +50,7 @@ public class IOContext {
public IOContext() {
this.currentBlockStart = 0;
this.nextBlockStart = -1;
+ this.currentRow = 0;
this.isBlockPointer = true;
this.ioExceptions = false;
}
@@ -69,6 +71,14 @@ public class IOContext {
this.nextBlockStart = nextBlockStart;
}
+ public long getCurrentRow() {
+ return currentRow;
+ }
+
+ public void setCurrentRow(long currentRow) {
+ this.currentRow = currentRow;
+ }
+
public boolean isBlockPointer() {
return isBlockPointer;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
index 30714b8..6107fb3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
@@ -32,10 +32,12 @@ public class VirtualColumn implements Serializable {
public static VirtualColumn FILENAME = new VirtualColumn("INPUT__FILE__NAME", (PrimitiveTypeInfo)TypeInfoFactory.stringTypeInfo);
public static VirtualColumn BLOCKOFFSET = new VirtualColumn("BLOCK__OFFSET__INSIDE__FILE", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo);
+ public static VirtualColumn ROWOFFSET = new VirtualColumn("ROW__OFFSET__INSIDE__BLOCK", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo);
static {
registry.put(FILENAME.name, FILENAME);
registry.put(BLOCKOFFSET.name, BLOCKOFFSET);
+ registry.put(ROWOFFSET.name, ROWOFFSET);
}
private String name;
@@ -95,4 +97,4 @@ public class VirtualColumn implements Serializable {
&& this.typeInfo.getTypeName().equals(c.getTypeInfo().getTypeName());
}
-}
\ No newline at end of file
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/AbstractGenericUDFEWAHBitmapOp.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/AbstractGenericUDFEWAHBitmapOp.java
new file mode 100644
index 0000000..a55e379
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/AbstractGenericUDFEWAHBitmapOp.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javaewah.EWAHCompressedBitmap;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.index.bitmap.BitmapObjectInput;
+import org.apache.hadoop.hive.ql.index.bitmap.BitmapObjectOutput;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableVoidObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+
+abstract public class AbstractGenericUDFEWAHBitmapOp extends GenericUDF {
+protected final ArrayList