Index: build.xml
===================================================================
--- build.xml (revision 948363)
+++ build.xml (working copy)
@@ -116,7 +116,7 @@
+ excludes="*hadoop*.jar"/>
Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 948363)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy)
@@ -603,6 +603,7 @@
assert(e instanceof RuntimeException);
throw (RuntimeException)e;
}
+
return ret;
}
@@ -651,24 +652,14 @@
try {
ms.openTransaction();
if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
- if (tbl.getSd().getLocation() == null
- || tbl.getSd().getLocation().isEmpty()) {
- tblPath = wh.getDefaultTablePath(
- tbl.getDbName(), tbl.getTableName());
- } else {
- if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
- LOG.warn("Location: " + tbl.getSd().getLocation()
- + " specified for non-external table:" + tbl.getTableName());
- }
- tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
- }
+ tblPath = getTableLocation(tbl);
tbl.getSd().setLocation(tblPath.toString());
}
// get_table checks whether database exists, it should be moved here
if (is_table_exists(tbl.getDbName(), tbl.getTableName())) {
- throw new AlreadyExistsException("Table " + tbl.getTableName()
- + " already exists");
+ throw new AlreadyExistsException(
+ "The name is already used by an existing table or index, please choose another one");
}
if (tblPath != null) {
@@ -699,6 +690,21 @@
}
}
+ private Path getTableLocation(Table tbl) throws MetaException {
+ Path tblPath;
+ if (tbl.getSd().getLocation() == null
+ || tbl.getSd().getLocation().isEmpty()) {
+ tblPath = wh.getDefaultTablePath(tbl.getDbName(), tbl.getTableName());
+ } else {
+ if (!isExternal(tbl)) {
+ LOG.warn("Location: " + tbl.getSd().getLocation()
+ + "specified for non-external table:" + tbl.getTableName());
+ }
+ tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
+ }
+ return tblPath;
+ }
+
public void create_table(final Table tbl) throws AlreadyExistsException,
MetaException, InvalidObjectException {
incrementCounter("create_table");
@@ -1242,13 +1248,125 @@
return;
}
- public boolean create_index(Index index_def)
- throws IndexAlreadyExistsException, MetaException {
- incrementCounter("create_index");
- // TODO Auto-generated method stub
- throw new MetaException("Not yet implemented");
+
+ public void create_index(Index index_def) throws AlreadyExistsException,
+ MetaException, NoSuchObjectException, InvalidObjectException, AlreadyExistsException {
+ this.incrementCounter("create_index");
+ validateCreateIndex(index_def);
+ createIndexTableEntry(index_def);
+
+ List partitions = new ArrayList();
+ if (index_def.getPartName() != null) {
+ Partition part = get_partition(index_def.getDbName(), index_def
+ .getTableName(), Warehouse.getPartValuesFromPartName(index_def
+ .getPartName()));
+ partitions.add(part);
+ }else{
+ partitions = get_partitions(index_def.getDbName(), index_def.getTableName(), Short.MAX_VALUE);
+ }
+
+ if(partitions.size()>0){
+ for(Partition part : partitions){
+ part.setTableName(index_def.getIndexName());
+ part.setCreateTime(getCurrentTime());
+ part.setLastAccessTime(getCurrentTime());
+ part.getSd().setBucketCols(null);
+ part.getSd().setLocation(null);
+ part.getSd().setNumBuckets(1);
+ add_partition(part);
+ }
+ }
}
+ private void createIndexTableEntry(Index index_def) throws MetaException,
+ NoSuchObjectException, InvalidObjectException {
+ if (!is_table_exists(index_def.getDbName(), index_def.getIndexName())) {
+ // we need to first create one table entry for the index table
+ Table tbl = get_table(index_def.getDbName(), index_def.getTableName()).clone();
+ tbl.setCreateTime(getCurrentTime());
+ tbl.setParameters(null);
+ MetaStoreUtils.setIndexTable(tbl);
+ MetaStoreUtils.setBaseTableOfIndexTable(tbl, index_def.getTableName());
+ MetaStoreUtils.setIndexType(tbl, index_def.getIndexType());
+ tbl.setTableName(index_def.getIndexName());
+ tbl.setLastAccessTime(getCurrentTime());
+ tbl.getSd().setBucketCols(null);
+ List indexedCols = new ArrayList();
+ List indexColumns = index_def.getColNames();
+ int k =0;
+ for (int i = 0; i < tbl.getSd().getCols().size(); i++) {
+ FieldSchema col = tbl.getSd().getCols().get(i);
+ if (indexColumns.contains(col.getName())) {
+ indexedCols.add(col);
+ k++;
+ }
+ }
+ if(k!=indexColumns.size())
+ throw new RuntimeException(
+ "Check the index columns, they should appear in the table being indexed.");
+ tbl.getSd().setCols(indexedCols);
+ tbl.getSd().setLocation(null);
+ Path tblPath = getTableLocation(tbl);
+ tbl.getSd().setLocation(tblPath.toString());
+ tbl.getSd().setNumBuckets(1);
+ tbl.getSd().setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
+ tbl.getSd().setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat");
+ getMS(false).createTable(tbl);
+ }
+ }
+
+ private int getCurrentTime() {
+ return (int) (System.currentTimeMillis() / 1000);
+ }
+
+ private void validateCreateIndex(Index index_def) throws MetaException, AlreadyExistsException {
+ Index index = null;
+ try {
+ index = get_index(index_def.getDbName(), index_def.getTableName(), index_def.getIndexName(), index_def.getPartName());
+ } catch (NoSuchObjectException e) {
+ }
+
+ /*
+ * will throw exception either when there already is a table with the same
+ * name as index_def's index name, or when index is already there but no
+ * partition specified in the index_def
+ */
+ if ((index == null && is_table_exists(index_def.getDbName(), index_def
+ .getIndexName()))
+ || (index != null && index_def.getPartName() == null)) {
+ throw new AlreadyExistsException(
+ "The name is already used by an existing table or index, please choose another one");
+ }
+
+ }
+
+ public Index get_index(String dbName, String tableName, String indexName,
+ String partName) throws MetaException, NoSuchObjectException {
+ this.incrementCounter("get_index");
+ Table t = get_table(dbName, indexName);
+ boolean isIndexTable = MetaStoreUtils.isIndexTable(t);
+ if (isIndexTable
+ && MetaStoreUtils.getBaseTableNameOfIndexTable(t).equals(tableName)) {
+ if(partName != null) {
+ Partition part = get_partition(dbName, tableName, Warehouse.getPartValuesFromPartName(partName));
+ if(part == null)
+ throw new NoSuchObjectException("index not found");
+ }
+
+ String indexType = MetaStoreUtils.getIndexType(t);
+ List cols = new ArrayList();
+ List fieldSchemas = t.getSd().getCols();
+ for (int i = 0; i < fieldSchemas.size(); i++) {
+ cols.add(fieldSchemas.get(i).getName());
+ }
+
+ Index index = new Index(indexName, indexType, tableName, dbName, cols,
+ partName);
+ return index;
+ }
+ throw new NoSuchObjectException("index not found");
+ }
+
public String getVersion() throws TException {
incrementCounter("getVersion");
logStartFunction("getVersion");
Index: metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (revision 948363)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (working copy)
@@ -328,4 +328,13 @@
}
return FileUtils.makePartName(colNames, vals);
}
+
+ public static List getPartValuesFromPartName(String partName)
+ throws MetaException {
+ LinkedHashMap partSpec = Warehouse.makeSpecFromName(partName);
+ List values = new ArrayList();
+ values.addAll(partSpec.values());
+ return values;
+ }
+
}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (revision 948363)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (working copy)
@@ -863,4 +863,27 @@
}
return true;
}
+ public static String INDEX_TABLE_PROPERTY="INDEX_TABLE";
+ public static String INDEX_BASE_TABLE_PROPERTY="INDEX_ORIGIN_TABLE";
+ public static String INDEX_TYPE_PROPERTY="INDEX_TYPE";
+
+ public static String getBaseTableNameOfIndexTable(Table indextbl) {
+ return indextbl.getParameters().get(INDEX_BASE_TABLE_PROPERTY);
+ }
+ public static boolean isIndexTable(Table t) {
+ return t.getParameters().get(INDEX_TABLE_PROPERTY).equalsIgnoreCase("true");
+ }
+ public static String getIndexType(Table t) {
+ return t.getParameters().get(INDEX_TYPE_PROPERTY);
+ }
+ public static void setIndexTable(Table tbl) {
+ tbl.putToParameters(INDEX_TABLE_PROPERTY, "true");
+ }
+ public static void setBaseTableOfIndexTable(Table tbl, String tableName) {
+ tbl.putToParameters(INDEX_BASE_TABLE_PROPERTY, tableName);
+ }
+ public static void setIndexType(Table tbl, String indexType) {
+ tbl.putToParameters(INDEX_TYPE_PROPERTY, indexType);
+ }
+
}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (revision 948363)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (working copy)
@@ -24,6 +24,8 @@
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.IndexAlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -329,4 +331,16 @@
*/
public Map partitionNameToSpec(String name)
throws MetaException, TException;
+
+ /**
+ * create an index
+ * @param index the index object
+ * @throws InvalidObjectException
+ * @throws MetaException
+ * @throws NoSuchObjectException
+ * @throws TException
+ * @throws AlreadyExistsException
+ */
+ public void createIndex(Index index) throws InvalidObjectException,
+ MetaException, NoSuchObjectException, TException, AlreadyExistsException;
}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 948363)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (working copy)
@@ -30,6 +30,8 @@
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.IndexAlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -589,6 +591,11 @@
return client.get_fields(db, tableName);
}
+ @Override
+ public void createIndex(Index index) throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException {
+ client.create_index(index);
+ }
+
/**
* @param db
* @param tableName
@@ -638,7 +645,7 @@
}
@Override
- public Map partitionNameToSpec(String name) throws MetaException, TException {
+ public Map partitionNameToSpec(String name) throws MetaException, TException{
return client.partition_name_to_spec(name);
}
}
Index: metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java
===================================================================
--- metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java (revision 948363)
+++ metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java (working copy)
@@ -84,6 +84,8 @@
public List partition_name_to_vals(String part_name) throws MetaException, TException;
public Map partition_name_to_spec(String part_name) throws MetaException, TException;
+
+ public void create_index(Index index) throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException;
}
@@ -1275,7 +1277,49 @@
}
throw new TApplicationException(TApplicationException.MISSING_RESULT, "partition_name_to_spec failed: unknown result");
}
+
+ public void create_index(Index index) throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException
+ {
+ send_create_index(index);
+ recv_create_index();
+ }
+ public void send_create_index(Index index) throws TException
+ {
+ oprot_.writeMessageBegin(new TMessage("create_index", TMessageType.CALL, seqid_));
+ create_index_args args = new create_index_args();
+ args.index = index;
+ args.write(oprot_);
+ oprot_.writeMessageEnd();
+ oprot_.getTransport().flush();
+ }
+
+ public void recv_create_index() throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException
+ {
+ TMessage msg = iprot_.readMessageBegin();
+ if (msg.type == TMessageType.EXCEPTION) {
+ TApplicationException x = TApplicationException.read(iprot_);
+ iprot_.readMessageEnd();
+ throw x;
+ }
+ create_index_result result = new create_index_result();
+ result.read(iprot_);
+ iprot_.readMessageEnd();
+ if (result.o1 != null) {
+ throw result.o1;
+ }
+ if (result.o2 != null) {
+ throw result.o2;
+ }
+ if (result.o3 != null) {
+ throw result.o3;
+ }
+ if (result.o4 != null) {
+ throw result.o4;
+ }
+ return;
+ }
+
}
public static class Processor extends com.facebook.fb303.FacebookService.Processor implements TProcessor {
private static final Logger LOGGER = Logger.getLogger(Processor.class.getName());
@@ -1313,6 +1357,7 @@
processMap_.put("get_config_value", new get_config_value());
processMap_.put("partition_name_to_vals", new partition_name_to_vals());
processMap_.put("partition_name_to_spec", new partition_name_to_spec());
+ processMap_.put("create_index", new create_index());
}
private Iface iface_;
@@ -1784,7 +1829,6 @@
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
-
}
private class add_partition implements ProcessFunction {
@@ -1816,7 +1860,6 @@
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
-
}
private class append_partition implements ProcessFunction {
@@ -1850,6 +1893,39 @@
}
}
+
+ private class create_index implements ProcessFunction {
+ public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
+ {
+ create_index_args args = new create_index_args();
+ args.read(iprot);
+ iprot.readMessageEnd();
+ create_index_result result = new create_index_result();
+ try {
+ iface_.create_index(args.index);
+ } catch (AlreadyExistsException o1) {
+ result.o1 = o1;
+ } catch (InvalidObjectException o2) {
+ result.o2 = o2;
+ } catch (MetaException o3) {
+ result.o3 = o3;
+ } catch (NoSuchObjectException o4) {
+ result.o4 = o4;
+ } catch (Throwable th) {
+ LOGGER.error("Internal error processing create_index", th);
+ TApplicationException x = new TApplicationException(TApplicationException.INTERNAL_ERROR, "Internal error processing create_index");
+ oprot.writeMessageBegin(new TMessage("create_index", TMessageType.EXCEPTION, seqid));
+ x.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ return;
+ }
+ oprot.writeMessageBegin(new TMessage("create_index", TMessageType.REPLY, seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ }
+ }
private class append_partition_by_name implements ProcessFunction {
public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
@@ -20182,5 +20258,596 @@
}
}
+
+
+ public static class create_index_args implements TBase, java.io.Serializable, Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("create_index_args");
+ private static final TField INDEX_FIELD_DESC = new TField("index", TType.STRUCT, (short)1);
+ private Index index;
+ public static final int INDEX = 1;
+
+ private final Isset __isset = new Isset();
+ private static final class Isset implements java.io.Serializable {
+ }
+
+ public static final Map metaDataMap = Collections.unmodifiableMap(new HashMap() {{
+ put(INDEX, new FieldMetaData("index", TFieldRequirementType.DEFAULT,
+ new StructMetaData(TType.STRUCT, Index.class)));
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(create_index_args.class, metaDataMap);
+ }
+
+ public create_index_args() {
+ }
+
+ public create_index_args(
+ Index index)
+ {
+ this();
+ this.index = index;
+ }
+
+ /**
+ * Performs a deep copy on other.
+ */
+ public create_index_args(create_index_args other) {
+ if (other.isSetIndex()) {
+ this.index = new Index(other.index);
+ }
+ }
+
+ @Override
+ public create_index_args clone() {
+ return new create_index_args(this);
+ }
+
+ public Index getIndex() {
+ return this.index;
+ }
+
+ public void setIndex(Index index) {
+ this.index = index;
+ }
+
+ public void unsetIndex() {
+ this.index = null;
+ }
+
+ // Returns true if field index is set (has been asigned a value) and false otherwise
+ public boolean isSetIndex() {
+ return this.index != null;
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ switch (fieldID) {
+ case INDEX:
+ if (value == null) {
+ unsetIndex();
+ } else {
+ setIndex((Index)value);
+ }
+ break;
+
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+ }
+ }
+
+ public Object getFieldValue(int fieldID) {
+ switch (fieldID) {
+ case INDEX:
+ return getIndex();
+
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+ }
+ }
+
+ // Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise
+ public boolean isSet(int fieldID) {
+ switch (fieldID) {
+ case INDEX:
+ return isSetIndex();
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+ }
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof create_index_args)
+ return this.equals((create_index_args)that);
+ return false;
+ }
+
+ public boolean equals(create_index_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_index = true && this.isSetIndex();
+ boolean that_present_index = true && that.isSetIndex();
+ if (this_present_index || that_present_index) {
+ if (!(this_present_index && that_present_index))
+ return false;
+ if (!this.index.equals(that.index))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case INDEX:
+ if (field.type == TType.STRUCT) {
+ this.index = new Index();
+ this.index.read(iprot);
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.index != null) {
+ oprot.writeFieldBegin(INDEX_FIELD_DESC);
+ this.index.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("create_index_args(");
+ boolean first = true;
+
+ sb.append("index:");
+ if (this.index == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.index);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ // check that fields of type enum have valid values
+ }
+
+ }
+
+ public static class create_index_result implements TBase, java.io.Serializable, Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("create_index_result");
+ private static final TField O1_FIELD_DESC = new TField("o1", TType.STRUCT, (short)1);
+ private static final TField O2_FIELD_DESC = new TField("o2", TType.STRUCT, (short)2);
+ private static final TField O3_FIELD_DESC = new TField("o3", TType.STRUCT, (short)3);
+ private static final TField O4_FIELD_DESC = new TField("o4", TType.STRUCT, (short)4);
+
+ private AlreadyExistsException o1;
+ public static final int O1 = 1;
+ private InvalidObjectException o2;
+ public static final int O2 = 2;
+ private MetaException o3;
+ public static final int O3 = 3;
+ private NoSuchObjectException o4;
+ public static final int O4 = 4;
+
+ private final Isset __isset = new Isset();
+ private static final class Isset implements java.io.Serializable {
+ }
+
+ public static final Map metaDataMap = Collections.unmodifiableMap(new HashMap() {{
+ put(O1, new FieldMetaData("o1", TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.STRUCT)));
+ put(O2, new FieldMetaData("o2", TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.STRUCT)));
+ put(O3, new FieldMetaData("o3", TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.STRUCT)));
+ put(O4, new FieldMetaData("o4", TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.STRUCT)));
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(create_index_result.class, metaDataMap);
+ }
+
+ public create_index_result() {
+ }
+
+ public create_index_result(
+ AlreadyExistsException o1,
+ InvalidObjectException o2,
+ MetaException o3,
+ NoSuchObjectException o4)
+ {
+ this();
+ this.o1 = o1;
+ this.o2 = o2;
+ this.o3 = o3;
+ this.o4 = o4;
+ }
+
+ /**
+ * Performs a deep copy on other.
+ */
+ public create_index_result(create_index_result other) {
+ if (other.isSetO1()) {
+ this.o1 = new AlreadyExistsException(other.o1);
+ }
+ if (other.isSetO2()) {
+ this.o2 = new InvalidObjectException(other.o2);
+ }
+ if (other.isSetO3()) {
+ this.o3 = new MetaException(other.o3);
+ }
+ if (other.isSetO4()) {
+ this.o4 = new NoSuchObjectException(other.o4);
+ }
+ }
+
+ @Override
+ public create_index_result clone() {
+ return new create_index_result(this);
+ }
+
+ public AlreadyExistsException getO1() {
+ return this.o1;
+ }
+
+ public void setO1(AlreadyExistsException o1) {
+ this.o1 = o1;
+ }
+
+ public void unsetO1() {
+ this.o1 = null;
+ }
+
+ // Returns true if field o1 is set (has been asigned a value) and false otherwise
+ public boolean isSetO1() {
+ return this.o1 != null;
+ }
+
+ public InvalidObjectException getO2() {
+ return this.o2;
+ }
+
+ public void setO2(InvalidObjectException o2) {
+ this.o2 = o2;
+ }
+
+ public void unsetO2() {
+ this.o2 = null;
+ }
+
+ // Returns true if field o2 is set (has been asigned a value) and false otherwise
+ public boolean isSetO2() {
+ return this.o2 != null;
+ }
+
+ public MetaException getO3() {
+ return this.o3;
+ }
+
+ public void setO3(MetaException o3) {
+ this.o3 = o3;
+ }
+
+ public void unsetO3() {
+ this.o3 = null;
+ }
+
+ // Returns true if field o3 is set (has been asigned a value) and false otherwise
+ public boolean isSetO3() {
+ return this.o3 != null;
+ }
+
+ public NoSuchObjectException getO4() {
+ return this.o4;
+ }
+
+ public void setO4(NoSuchObjectException o4) {
+ this.o4 = o4;
+ }
+
+ public void unsetO4() {
+ this.o4 = null;
+ }
+
+ // Returns true if field o4 is set (has been asigned a value) and false otherwise
+ public boolean isSetO4() {
+ return this.o4 != null;
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ switch (fieldID) {
+ case O1:
+ if (value == null) {
+ unsetO1();
+ } else {
+ setO1((AlreadyExistsException)value);
+ }
+ break;
+
+ case O2:
+ if (value == null) {
+ unsetO2();
+ } else {
+ setO2((InvalidObjectException)value);
+ }
+ break;
+
+ case O3:
+ if (value == null) {
+ unsetO3();
+ } else {
+ setO3((MetaException)value);
+ }
+ break;
+
+ case O4:
+ if (value == null) {
+ unsetO4();
+ } else {
+ setO4((NoSuchObjectException)value);
+ }
+ break;
+
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+ }
+ }
+
+ public Object getFieldValue(int fieldID) {
+ switch (fieldID) {
+ case O1:
+ return getO1();
+
+ case O2:
+ return getO2();
+
+ case O3:
+ return getO3();
+
+ case O4:
+ return getO4();
+
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+ }
+ }
+
+ // Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise
+ public boolean isSet(int fieldID) {
+ switch (fieldID) {
+ case O1:
+ return isSetO1();
+ case O2:
+ return isSetO2();
+ case O3:
+ return isSetO3();
+ case O4:
+ return isSetO4();
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+ }
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof create_index_result)
+ return this.equals((create_index_result)that);
+ return false;
+ }
+
+ public boolean equals(create_index_result that) {
+ if (that == null)
+ return false;
+ boolean this_present_o1 = true && this.isSetO1();
+ boolean that_present_o1 = true && that.isSetO1();
+ if (this_present_o1 || that_present_o1) {
+ if (!(this_present_o1 && that_present_o1))
+ return false;
+ if (!this.o1.equals(that.o1))
+ return false;
+ }
+
+ boolean this_present_o2 = true && this.isSetO2();
+ boolean that_present_o2 = true && that.isSetO2();
+ if (this_present_o2 || that_present_o2) {
+ if (!(this_present_o2 && that_present_o2))
+ return false;
+ if (!this.o2.equals(that.o2))
+ return false;
+ }
+
+ boolean this_present_o3 = true && this.isSetO3();
+ boolean that_present_o3 = true && that.isSetO3();
+ if (this_present_o3 || that_present_o3) {
+ if (!(this_present_o3 && that_present_o3))
+ return false;
+ if (!this.o3.equals(that.o3))
+ return false;
+ }
+
+ boolean this_present_o4 = true && this.isSetO4();
+ boolean that_present_o4 = true && that.isSetO4();
+ if (this_present_o4 || that_present_o4) {
+ if (!(this_present_o4 && that_present_o4))
+ return false;
+ if (!this.o4.equals(that.o4))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case O1:
+ if (field.type == TType.STRUCT) {
+ this.o1 = new AlreadyExistsException();
+ this.o1.read(iprot);
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case O2:
+ if (field.type == TType.STRUCT) {
+ this.o2 = new InvalidObjectException();
+ this.o2.read(iprot);
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case O3:
+ if (field.type == TType.STRUCT) {
+ this.o3 = new MetaException();
+ this.o3.read(iprot);
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case O4:
+ if (field.type == TType.STRUCT) {
+ this.o4 = new NoSuchObjectException();
+ this.o4.read(iprot);
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ if (this.isSetO1()) {
+ oprot.writeFieldBegin(O1_FIELD_DESC);
+ this.o1.write(oprot);
+ oprot.writeFieldEnd();
+ } else if (this.isSetO2()) {
+ oprot.writeFieldBegin(O2_FIELD_DESC);
+ this.o2.write(oprot);
+ oprot.writeFieldEnd();
+ } else if (this.isSetO3()) {
+ oprot.writeFieldBegin(O3_FIELD_DESC);
+ this.o3.write(oprot);
+ oprot.writeFieldEnd();
+ } else if (this.isSetO4()) {
+ oprot.writeFieldBegin(O4_FIELD_DESC);
+ this.o4.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("create_index_result(");
+ boolean first = true;
+
+ sb.append("o1:");
+ if (this.o1 == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.o1);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("o2:");
+ if (this.o2 == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.o2);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("o3:");
+ if (this.o3 == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.o3);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("o4:");
+ if (this.o4 == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.o4);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ // check that fields of type enum have valid values
+ }
+
+ }
+
+
}
Index: metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Index.java
===================================================================
--- metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Index.java (revision 948363)
+++ metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Index.java (working copy)
@@ -29,7 +29,7 @@
private String indexName;
public static final int INDEXNAME = 1;
- private int indexType;
+ private String indexType;
public static final int INDEXTYPE = 2;
private String tableName;
public static final int TABLENAME = 3;
@@ -70,7 +70,7 @@
public Index(
String indexName,
- int indexType,
+ String indexType,
String tableName,
String dbName,
List colNames,
@@ -135,11 +135,11 @@
return this.indexName != null;
}
- public int getIndexType() {
+ public String getIndexType() {
return this.indexType;
}
- public void setIndexType(int indexType) {
+ public void setIndexType(String indexType) {
this.indexType = indexType;
this.__isset.indexType = true;
}
@@ -250,7 +250,7 @@
if (value == null) {
unsetIndexType();
} else {
- setIndexType((Integer)value);
+ setIndexType((String)value);
}
break;
@@ -431,7 +431,7 @@
break;
case INDEXTYPE:
if (field.type == TType.I32) {
- this.indexType = iprot.readI32();
+ this.indexType = iprot.readString();
this.__isset.indexType = true;
} else {
TProtocolUtil.skip(iprot, field.type);
@@ -495,9 +495,6 @@
oprot.writeString(this.indexName);
oprot.writeFieldEnd();
}
- oprot.writeFieldBegin(INDEX_TYPE_FIELD_DESC);
- oprot.writeI32(this.indexType);
- oprot.writeFieldEnd();
if (this.tableName != null) {
oprot.writeFieldBegin(TABLE_NAME_FIELD_DESC);
oprot.writeString(this.tableName);
Index: metastore/src/gen-php/hive_metastore_types.php
===================================================================
--- metastore/src/gen-php/hive_metastore_types.php (revision 948363)
+++ metastore/src/gen-php/hive_metastore_types.php (working copy)
@@ -1699,7 +1699,6 @@
public $tableName = null;
public $dbName = null;
public $colNames = null;
- public $partName = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -1710,7 +1709,7 @@
),
2 => array(
'var' => 'indexType',
- 'type' => TType::I32,
+ 'type' => TType::STRING,
),
3 => array(
'var' => 'tableName',
@@ -1728,10 +1727,6 @@
'type' => TType::STRING,
),
),
- 6 => array(
- 'var' => 'partName',
- 'type' => TType::STRING,
- ),
);
}
if (is_array($vals)) {
@@ -1750,9 +1745,6 @@
if (isset($vals['colNames'])) {
$this->colNames = $vals['colNames'];
}
- if (isset($vals['partName'])) {
- $this->partName = $vals['partName'];
- }
}
}
@@ -1783,8 +1775,8 @@
}
break;
case 2:
- if ($ftype == TType::I32) {
- $xfer += $input->readI32($this->indexType);
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->indexType);
} else {
$xfer += $input->skip($ftype);
}
@@ -1820,13 +1812,6 @@
$xfer += $input->skip($ftype);
}
break;
- case 6:
- if ($ftype == TType::STRING) {
- $xfer += $input->readString($this->partName);
- } else {
- $xfer += $input->skip($ftype);
- }
- break;
default:
$xfer += $input->skip($ftype);
break;
@@ -1846,8 +1831,8 @@
$xfer += $output->writeFieldEnd();
}
if ($this->indexType !== null) {
- $xfer += $output->writeFieldBegin('indexType', TType::I32, 2);
- $xfer += $output->writeI32($this->indexType);
+ $xfer += $output->writeFieldBegin('indexType', TType::STRING, 2);
+ $xfer += $output->writeString($this->indexType);
$xfer += $output->writeFieldEnd();
}
if ($this->tableName !== null) {
@@ -1877,11 +1862,6 @@
}
$xfer += $output->writeFieldEnd();
}
- if ($this->partName !== null) {
- $xfer += $output->writeFieldBegin('partName', TType::STRING, 6);
- $xfer += $output->writeString($this->partName);
- $xfer += $output->writeFieldEnd();
- }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
Index: metastore/if/hive_metastore.thrift
===================================================================
--- metastore/if/hive_metastore.thrift (revision 948363)
+++ metastore/if/hive_metastore.thrift (working copy)
@@ -88,15 +88,15 @@
7: map parameters
}
-// index on a hive table is also another table whose columns are the subset of the base table columns along with the offset
-// this will automatically generate table (table_name_index_name)
+// index on a hive table is also another table whose columns are the subset
+// of the base table columns along with the offset this will automatically
+// generate table (table_name_index_name)
struct Index {
1: string indexName, // unique with in the whole database namespace
- 2: i32 indexType, // reserved
+ 2: string indexType, // index type
3: string tableName,
4: string dbName,
5: list colNames, // for now columns will be sorted in the ascending order
- 6: string partName // partition name
}
// schema of the table/query results etc.
@@ -243,7 +243,9 @@
// converts a partition name into a partition specification (a mapping from
// the partition cols to the values)
map partition_name_to_spec(1: string part_name)
- throws(1: MetaException o1)
+ throws(1: MetaException o1)
+ void create_index(1:Index index)
+ throws(1:AlreadyExistsException o1, 2:InvalidObjectException o2, 3:MetaException o3, 4:NoSuchObjectException o4)
}
// these should be needed only for backward compatibility with filestore
Index: build-common.xml
===================================================================
--- build-common.xml (revision 948363)
+++ build-common.xml (working copy)
@@ -207,8 +207,7 @@
-
+
Index: CHANGES.txt
===================================================================
--- CHANGES.txt (revision 948363)
+++ CHANGES.txt (working copy)
@@ -453,19 +453,6 @@
HIVE-1357. optimize CombineHiveInputFormat to cache inputFormat
(Ning Zhang via namit)
- HIVE-1350. hive.query.id is not unique
- (Namit Jain via Ning Zhang)
-
- HIVE-1335 DataNucleus should use connection pooling
- (Edward Capriolo via namit)
-
- HIVE-1366. inputFileFormat error if the merge job takes a different input
- file format than the default output file format
- (Namit Jain via Ning Zhang)
-
- HIVE-1365. Bug in sort-merge join
- (He Yongqiang via namit)
-
Release 0.5.0 - Unreleased
INCOMPATIBLE CHANGES
Index: serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java
===================================================================
--- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java (revision 948363)
+++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java (working copy)
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -36,6 +39,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
/**
* ObjectInspectorFactory is the primary way to create new ObjectInspector
@@ -49,7 +53,7 @@
/**
* TypeEntry stores information about a Hive Primitive TypeInfo.
*/
- public static class PrimitiveTypeEntry {
+ public static class PrimitiveTypeEntry implements Writable{
/**
* The category of the PrimitiveType.
@@ -85,6 +89,29 @@
primitiveWritableClass = hiveClass;
this.typeName = typeName;
}
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ primitiveCategory = WritableUtils.readEnum(in,
+ PrimitiveObjectInspector.PrimitiveCategory.class);
+ typeName = WritableUtils.readString(in);
+ try {
+ primitiveJavaType = Class.forName(WritableUtils.readString(in));
+ primitiveJavaClass = Class.forName(WritableUtils.readString(in));
+ primitiveWritableClass = Class.forName(WritableUtils.readString(in));
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeEnum(out, primitiveCategory);
+ WritableUtils.writeString(out, typeName);
+ WritableUtils.writeString(out, primitiveJavaType.getName());
+ WritableUtils.writeString(out, primitiveJavaClass.getName());
+ WritableUtils.writeString(out, primitiveWritableClass.getName());
+ }
}
static final Map primitiveCategoryToTypeEntry = new HashMap();
Index: README.txt
===================================================================
--- README.txt (revision 948363)
+++ README.txt (working copy)
@@ -158,7 +158,7 @@
hive configuration variable named javax.jdo.option.ConnectionURL. By default
(see conf/hive-default.xml) - this location is ./metastore_db
-Right now, in the default configuration, this metadata can only be seen by
+Right now - in the default configuration, this metadata can only be seen by
one user at a time.
Metastore can be stored in any database that is supported by JPOX. The
@@ -168,7 +168,7 @@
The database schema is defined in JDO metadata annotations file package.jdo
at src/contrib/hive/metastore/src/model.
-In the future, the metastore itself can be a standalone server.
+In the future - the metastore itself can be a standalone server.
DML Operations
Index: ql/src/test/results/clientpositive/index_compact.q.out
===================================================================
--- ql/src/test/results/clientpositive/index_compact.q.out (revision 0)
+++ ql/src/test/results/clientpositive/index_compact.q.out (revision 0)
@@ -0,0 +1,70 @@
+PREHOOK: query: EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key)
+PREHOOK: type: null
+POSTHOOK: query: EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key)
+POSTHOOK: type: null
+ABSTRACT SYNTAX TREE:
+ (TOK_CREATEINDEX srcpart_index_proj COMPACT srcpart (TOK_TABCOLNAME key))
+
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+
+
+PREHOOK: query: CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key)
+PREHOOK: type: null
+POSTHOOK: query: CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key)
+POSTHOOK: type: null
+PREHOOK: query: UPDATE INDEX srcpart_index_proj
+PREHOOK: type: null
+POSTHOOK: query: UPDATE INDEX srcpart_index_proj
+POSTHOOK: type: null
+PREHOOK: query: SELECT x.* FROM srcpart_index_proj x WHERE x.ds = '2008-04-08' and x.hr = 11
+PREHOOK: type: QUERY
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Index/build/ql/scratchdir/hive_2010-06-01_18-59-55_156_2360593265979076422/10000
+POSTHOOK: query: SELECT x.* FROM srcpart_index_proj x WHERE x.ds = '2008-04-08' and x.hr = 11
+POSTHOOK: type: QUERY
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Index/build/ql/scratchdir/hive_2010-06-01_18-59-55_156_2360593265979076422/10000
+PREHOOK: query: DROP TABLE srcpart_index_proj
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE srcpart_index_proj
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@srcpart_index_proj
+PREHOOK: query: EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key)
+PREHOOK: type: null
+POSTHOOK: query: EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key)
+POSTHOOK: type: null
+ABSTRACT SYNTAX TREE:
+ (TOK_CREATEINDEX srcpart_index_proj COMPACT srcpart (TOK_TABCOLNAME key))
+
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+
+
+PREHOOK: query: CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key)
+PREHOOK: type: null
+POSTHOOK: query: CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key)
+POSTHOOK: type: null
+PREHOOK: query: UPDATE INDEX srcpart_index_proj
+PREHOOK: type: null
+POSTHOOK: query: UPDATE INDEX srcpart_index_proj
+POSTHOOK: type: null
+PREHOOK: query: SELECT x.* FROM srcpart_index_proj x
+PREHOOK: type: QUERY
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Index/build/ql/scratchdir/hive_2010-06-01_18-59-55_681_5703964119870136891/10000
+POSTHOOK: query: SELECT x.* FROM srcpart_index_proj x
+POSTHOOK: type: QUERY
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Index/build/ql/scratchdir/hive_2010-06-01_18-59-55_681_5703964119870136891/10000
+PREHOOK: query: DROP TABLE srcpart_index_proj
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE srcpart_index_proj
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@srcpart_index_proj
Index: ql/src/test/queries/clientnegative/bad_indextype.q
===================================================================
--- ql/src/test/queries/clientnegative/bad_indextype.q (revision 0)
+++ ql/src/test/queries/clientnegative/bad_indextype.q (revision 0)
@@ -0,0 +1 @@
+CREATE INDEX srcpart_index_proj TYPE UNKNOWN ON TABLE srcpart(key);
\ No newline at end of file
Index: ql/src/test/queries/clientnegative/index_partition_not_exist.q
===================================================================
--- ql/src/test/queries/clientnegative/index_partition_not_exist.q (revision 0)
+++ ql/src/test/queries/clientnegative/index_partition_not_exist.q (revision 0)
@@ -0,0 +1,2 @@
+CREATE INDEX srcpart_index_proj TYPE SUMMARY ON TABLE srcpart(key) PARTITION(ds='2008-04-08', hr=11);
+UPDATE INDEX srcpart_index_proj PARTITION(ds='2008-04-08', hr=12);
\ No newline at end of file
Index: ql/src/test/queries/clientnegative/index_name_unavailable.q
===================================================================
--- ql/src/test/queries/clientnegative/index_name_unavailable.q (revision 0)
+++ ql/src/test/queries/clientnegative/index_name_unavailable.q (revision 0)
@@ -0,0 +1 @@
+CREATE INDEX srcpart TYPE PROJECTION ON TABLE src(key);
\ No newline at end of file
Index: ql/src/test/queries/clientpositive/index_summary.q
===================================================================
--- ql/src/test/queries/clientpositive/index_summary.q (revision 0)
+++ ql/src/test/queries/clientpositive/index_summary.q (revision 0)
@@ -0,0 +1,13 @@
+EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE SUMMARY ON TABLE srcpart(key);
+CREATE INDEX srcpart_index_proj TYPE SUMMARY ON TABLE srcpart(key);
+UPDATE INDEX srcpart_index_proj;
+SELECT x.* FROM srcpart_index_proj x WHERE x.ds = '2008-04-08' and x.hr = 11;
+DROP TABLE srcpart_index_proj;
+
+EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE SUMMARY ON TABLE srcpart(key) PARTITION(ds='2008-04-08', hr=11);
+CREATE INDEX srcpart_index_proj TYPE SUMMARY ON TABLE srcpart(key) PARTITION(ds='2008-04-08', hr=11);
+UPDATE INDEX srcpart_index_proj;
+SELECT x.* FROM srcpart_index_proj x;
+DROP TABLE srcpart_index_proj;
\ No newline at end of file
Index: ql/src/test/queries/clientpositive/index_projection.q
===================================================================
--- ql/src/test/queries/clientpositive/index_projection.q (revision 0)
+++ ql/src/test/queries/clientpositive/index_projection.q (revision 0)
@@ -0,0 +1,13 @@
+EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE PROJECTION ON TABLE srcpart(key);
+CREATE INDEX srcpart_index_proj TYPE PROJECTION ON TABLE srcpart(key);
+UPDATE INDEX srcpart_index_proj;
+SELECT x.* FROM srcpart_index_proj x WHERE x.ds = '2008-04-08' and x.hr = 11;
+DROP TABLE srcpart_index_proj;
+
+EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE PROJECTION ON TABLE srcpart(key) PARTITION(ds='2008-04-08', hr=11);
+CREATE INDEX srcpart_index_proj TYPE PROJECTION ON TABLE srcpart(key) PARTITION(ds='2008-04-08', hr=11);
+UPDATE INDEX srcpart_index_proj;
+SELECT x.* FROM srcpart_index_proj x;
+DROP TABLE srcpart_index_proj;
\ No newline at end of file
Index: ql/src/test/queries/clientpositive/index_compact.q
===================================================================
--- ql/src/test/queries/clientpositive/index_compact.q (revision 0)
+++ ql/src/test/queries/clientpositive/index_compact.q (revision 0)
@@ -0,0 +1,13 @@
+EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key);
+CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key);
+UPDATE INDEX srcpart_index_proj;
+SELECT x.* FROM srcpart_index_proj x WHERE x.ds = '2008-04-08' and x.hr = 11;
+DROP TABLE srcpart_index_proj;
+
+EXPLAIN
+CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key);
+CREATE INDEX srcpart_index_proj TYPE COMPACT ON TABLE srcpart(key);
+UPDATE INDEX srcpart_index_proj;
+SELECT x.* FROM srcpart_index_proj x;
+DROP TABLE srcpart_index_proj;
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy)
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -44,7 +45,9 @@
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -327,6 +330,84 @@
}
/**
+ * Creates the index with the given objects
+ *
+ * @param indexName
+ * the index name
+ * @param tableName
+ * the table name that this index is built on
+ * @param indexType
+ * the type of the index
+ * @param indexedCols
+ * @param inputFormat
+ * @param outputFormat
+ * @param serde
+ * @throws HiveException
+ */
+ public void createIndex(String indexName, String tableName, String indexType,
+ List indexedCols, String inputFormat, String outputFormat,
+ String serde)
+ throws HiveException {
+ try {
+ String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+ org.apache.hadoop.hive.metastore.api.Table tbl = getMSC().getTable(dbName, tableName).clone();
+ tbl.setParameters(null);
+ MetaStoreUtils.setIndexTable(tbl);
+ MetaStoreUtils.setBaseTableOfIndexTable(tbl, tableName);
+ MetaStoreUtils.setIndexType(tbl, indexType);
+ tbl.setTableName(indexName);
+ List indexTblCols = new ArrayList();
+ List sortCols = new ArrayList();
+ tbl.getSd().setBucketCols(null);
+ int k = 0;
+ for (int i = 0; i < tbl.getSd().getCols().size(); i++) {
+ FieldSchema col = tbl.getSd().getCols().get(i);
+ if (indexedCols.contains(col.getName())) {
+ indexTblCols.add(col);
+ sortCols.add(new Order(col.getName(), 1));
+ k++;
+ }
+ }
+ if (k != indexedCols.size())
+ throw new RuntimeException(
+ "Check the index columns, they should appear in the table being indexed.");
+
+ FieldSchema bucketFileName = new FieldSchema("_bucketname", "string", "");
+ indexTblCols.add(bucketFileName);
+ FieldSchema offSets = new FieldSchema("_offsets", "array", "");
+ indexTblCols.add(offSets);
+ tbl.getSd().setCols(indexTblCols);
+ tbl.getSd().setLocation(null);
+ tbl.getSd().setNumBuckets(1);
+ tbl.getSd().setSortCols(sortCols);
+ if(inputFormat == null) {
+ inputFormat = "org.apache.hadoop.mapred.TextInputFormat";
+ }
+ tbl.getSd().setInputFormat(inputFormat);
+
+ if(outputFormat == null) {
+ outputFormat =
+ "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+ }
+ tbl.getSd().setOutputFormat(outputFormat);
+
+ if(serde == null) {
+ serde = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
+ }
+
+ tbl.getSd().getSerdeInfo().setSerializationLib(serde);
+ getMSC().createTable(tbl);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ /**
+ * Drops table along with the data in it. If the table doesn't exist
+ * then it is a no-op
+ * @param dbName database where the table lives
+ * @param tableName table to drop
+ * @throws HiveException thrown if the drop fails
* Drops table along with the data in it. If the table doesn't exist then it
* is a no-op
*
@@ -1101,4 +1182,6 @@
+ e.getMessage(), e);
}
}
+
+
};
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (working copy)
@@ -153,6 +153,14 @@
return (ret);
}
+ makeChild(ret, tasklist);
+
+ return (ret);
+ }
+
+
+ public static void makeChild(Task> ret,
+ Task extends Serializable>... tasklist) {
// Add the new task as child of each of the passed in tasks
for (Task extends Serializable> tsk : tasklist) {
List> children = tsk.getChildTasks();
@@ -162,8 +170,6 @@
children.add(ret);
tsk.setChildTasks(children);
}
-
- return (ret);
}
private TaskFactory() {
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -55,6 +55,7 @@
import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.index.IndexBuilderBaseReducer;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -73,8 +74,10 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.log4j.BasicConfigurator;
@@ -427,7 +430,7 @@
// this is a temporary hack to fix things that are not fixed in the compiler
Integer numReducersFromWork = work.getNumReduceTasks();
- if (work.getReducer() == null) {
+ if (work.getReducer() == null && work.getIndexCols() == null) {
console
.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
work.setNumReduceTasks(Integer.valueOf(0));
@@ -547,16 +550,13 @@
throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
}
- String hiveScratchDir;
- if (driverContext.getCtx() != null && driverContext.getCtx().getQueryPath() != null) {
- hiveScratchDir = driverContext.getCtx().getQueryPath().toString();
- } else {
- hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
- }
+ String hiveScratchDir = getScratchDir(driverContext);
String jobScratchDirStr = hiveScratchDir + File.separator
+ Utilities.randGen.nextInt();
Path jobScratchDir = new Path(jobScratchDirStr);
+ FileOutputFormat.setOutputPath(job, jobScratchDir);
+
String emptyScratchDirStr = null;
Path emptyScratchDir = null;
@@ -579,13 +579,12 @@
}
}
}
+
+ job.setMapperClass(getMapperClass());
- FileOutputFormat.setOutputPath(job, jobScratchDir);
- job.setMapperClass(ExecMapper.class);
+ job.setMapOutputKeyClass(getMapOutputKeyClass());
+ job.setMapOutputValueClass(getMapOutputValueClass());
- job.setMapOutputKeyClass(HiveKey.class);
- job.setMapOutputValueClass(BytesWritable.class);
-
try {
job.setPartitionerClass((Class extends Partitioner>)
(Class.forName(HiveConf.getVar(job, HiveConf.ConfVars.HIVEPARTITIONER))));
@@ -601,7 +600,7 @@
work.getMinSplitSize().intValue());
}
job.setNumReduceTasks(work.getNumReduceTasks().intValue());
- job.setReducerClass(ExecReducer.class);
+ job.setReducerClass(getReducerClass());
if (work.getInputformat() != null) {
HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat());
@@ -615,24 +614,20 @@
HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
useSpeculativeExecReducers);
- String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
- if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
- inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
- }
-
- LOG.info("Using " + inpFormat);
-
try {
- job.setInputFormat((Class extends InputFormat>) (Class
- .forName(inpFormat)));
+ job.setInputFormat(getInputFormatCls());
} catch (ClassNotFoundException e) {
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e);
}
// No-Op - we don't really write anything here ..
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
+ job.setOutputKeyClass(getOutputKeyClass());
+ job.setOutputValueClass(getOutputValueClass());
+
+ Class extends OutputFormat> output = getOutputFormatCls();
+ if (output != null)
+ job.setOutputFormat(output);
+
// Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands
// it
String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
@@ -668,7 +663,7 @@
HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, "JOB"
+ randGen.nextInt());
}
-
+
try {
addInputPaths(job, work, emptyScratchDirStr);
@@ -768,6 +763,11 @@
if (work.getReducer() != null) {
work.getReducer().jobClose(job, success, feedBack);
}
+
+ if (IndexBuilderBaseReducer.class.isAssignableFrom(this
+ .getReducerClass())) {
+ this.closeIndexBuilder(job, success);
+ }
}
} catch (Exception e) {
// jobClose needs to execute successfully otherwise fail task
@@ -783,7 +783,27 @@
return (returnVal);
}
+
+ private void closeIndexBuilder(JobConf job, boolean success)
+ throws HiveException, IOException {
+ String outputPath = this.work.getOutputPath();
+ if (outputPath == null) {
+ outputPath = getScratchDir(driverContext) + Utilities.randGen.nextInt();
+ }
+ console.printInfo("Closing Index builder job. Output path is " + outputPath);
+ IndexBuilderBaseReducer.indexBuilderJobClose(outputPath, success, job, console);
+ }
+ private String getScratchDir(DriverContext driverContext) {
+ String hiveScratchDir;
+ if (driverContext.getCtx() != null && driverContext.getCtx().getQueryPath() != null) {
+ hiveScratchDir = driverContext.getCtx().getQueryPath().toString();
+ } else {
+ hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
+ }
+ return hiveScratchDir;
+ }
+
/**
* This msg pattern is used to track when a job is started.
*
@@ -1288,6 +1308,80 @@
return StageType.MAPRED;
}
+ private boolean delOutputIfExists;
+
+ public Class extends InputFormat> getInputFormatCls() throws ClassNotFoundException {
+ if(this.getWork() == null || this.getWork().getInputFormatCls() == null) {
+ String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
+ if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat)))
+ inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+ if(inpFormat == null) {
+ return org.apache.hadoop.hive.ql.io.HiveInputFormat.class;
+ }
+ return (Class extends InputFormat>) Class.forName(inpFormat);
+ } else {
+ return this.getWork().getInputFormatCls();
+ }
+
+ }
+
+ public Class extends Mapper> getMapperClass() {
+ if(this.getWork() != null && this.getWork().getMapperClass() != null) {
+ return this.getWork().getMapperClass();
+ }
+ return ExecMapper.class;
+ }
+
+ public Class extends Reducer> getReducerClass() {
+ if(this.getWork() != null && this.getWork().getReducerClass() != null) {
+ return this.getWork().getReducerClass();
+ }
+ return ExecReducer.class;
+ }
+
+ public Class> getMapOutputKeyClass() {
+ if(this.getWork()!=null && this.getWork().getMapOutputKeyClass() != null) {
+ return this.getWork().getMapOutputKeyClass();
+ }
+ return HiveKey.class;
+ }
+
+ public Class> getMapOutputValueClass() {
+ if(this.getWork()!=null && this.getWork().getMapOutputValueClass() != null) {
+ return this.getWork().getMapOutputValueClass();
+ }
+ return BytesWritable.class;
+ }
+
+ public Class> getOutputKeyClass() {
+ if(this.getWork()!=null && this.getWork().getOutputKeyClass() !=null) {
+ return this.getWork().getOutputKeyClass();
+ }
+ return Text.class;
+ }
+
+ public Class> getOutputValueClass() {
+ if(this.getWork()!=null && this.getWork().getOutputValueClass() != null) {
+ return this.getWork().getOutputValueClass();
+ }
+ return Text.class;
+ }
+
+ public boolean isDelOutputIfExists() {
+ return delOutputIfExists;
+ }
+
+ public void setDelOutputIfExists(boolean delOutputIfExists) {
+ this.delOutputIfExists = delOutputIfExists;
+ }
+
+ public Class extends OutputFormat> getOutputFormatCls() {
+ if (this.work != null) {
+ return this.getWork().getOutputFormatCls();
+ }
+ return null;
+ }
+
@Override
public String getName() {
return "EXEC";
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy)
@@ -49,6 +49,7 @@
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -79,6 +80,7 @@
import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc;
import org.apache.hadoop.hive.ql.plan.ShowTablesDesc;
import org.apache.hadoop.hive.ql.plan.TouchDesc;
+import org.apache.hadoop.hive.ql.plan.createIndexDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -125,7 +127,13 @@
return createTable(db, crtTbl);
}
+ createIndexDesc crtIndex = work.getCreateIndexDesc();
+ if (crtIndex != null) {
+ return createIndex(db, crtIndex);
+ }
+
CreateTableLikeDesc crtTblLike = work.getCreateTblLikeDesc();
+
if (crtTblLike != null) {
return createTableLike(db, crtTblLike);
}
@@ -209,6 +217,14 @@
return 0;
}
+ private int createIndex(Hive db, createIndexDesc crtIndex) throws HiveException {
+ db
+ .createIndex(crtIndex.getIndexName(), crtIndex.getTableName(), crtIndex
+ .getIndexType(), crtIndex.getIndexedCols(), crtIndex.getInputFormat(),
+ crtIndex.getOutputFormat(), crtIndex.getSerde());
+ return 0;
+ }
+
/**
* Add a partition to a table.
*
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (working copy)
@@ -30,7 +30,7 @@
*/
public class DDLWork implements Serializable {
private static final long serialVersionUID = 1L;
-
+ private createIndexDesc createIndexDesc;
private CreateTableDesc createTblDesc;
private CreateTableLikeDesc createTblLikeDesc;
private CreateViewDesc createVwDesc;
@@ -63,6 +63,10 @@
this.outputs = outputs;
}
+ public DDLWork(createIndexDesc createIndex) {
+ this.createIndexDesc = createIndex;
+ }
+
/**
* @param alterTblDesc
* alter table descriptor
@@ -222,7 +226,15 @@
public void setCreateTblDesc(CreateTableDesc createTblDesc) {
this.createTblDesc = createTblDesc;
}
+
+ public createIndexDesc getCreateIndexDesc() {
+ return createIndexDesc;
+ }
+ public void setCreateIndexDesc(createIndexDesc createIndexDesc) {
+ this.createIndexDesc = createIndexDesc;
+ }
+
/**
* @return the createTblDesc
*/
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/createIndexDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/createIndexDesc.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/createIndexDesc.java (revision 0)
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class createIndexDesc extends DDLDesc implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ String tableName;
+ String indexName;
+ List indexedCols;
+ String inputFormat;
+ String outputFormat;
+ String serde;
+ String indexType;
+
+ public createIndexDesc() {
+ super();
+ }
+
+ public createIndexDesc(String tableName, String indexName,
+ List indexedCols,String inputFormat, String outputFormat, String serde, String indexType) {
+ super();
+ this.tableName = tableName;
+ this.indexName = indexName;
+ this.indexedCols = indexedCols;
+ this.indexType = indexType;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.serde = serde;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public void setIndexName(String indexName) {
+ this.indexName = indexName;
+ }
+
+ public List getIndexedCols() {
+ return indexedCols;
+ }
+
+ public void setIndexedCols(List indexedCols) {
+ this.indexedCols = indexedCols;
+ }
+
+ public String getInputFormat() {
+ return inputFormat;
+ }
+
+ public void setInputFormat(String inputFormat) {
+ this.inputFormat = inputFormat;
+ }
+
+ public String getOutputFormat() {
+ return outputFormat;
+ }
+
+ public void setOutputFormat(String outputFormat) {
+ this.outputFormat = outputFormat;
+ }
+
+ public String getSerde() {
+ return serde;
+ }
+
+ public void setSerde(String serde) {
+ this.serde = serde;
+ }
+
+ public String getIndexType() {
+ return indexType;
+ }
+
+ public void setIndexType(String indexType) {
+ this.indexType = indexType;
+ }
+
+}
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy)
@@ -25,8 +25,13 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reducer;
/**
* MapredWork.
@@ -65,7 +70,26 @@
private MapredLocalWork mapLocalWork;
private String inputformat;
-
+
+ //theses fields are used for building indexing.
+ private Class extends Mapper> mapperClass;
+ private Class extends Reducer> reducerClass;
+ private Class> mapOutputKeyClass;
+ private Class> mapOutputValueClass;
+ private Class> outputKeyClass;
+ private Class> outputValueClass;
+ private Class extends InputFormat> inputFormatCls;
+ private Class extends OutputFormat> outputFormatCls;
+ private boolean compressed;
+ private String compressCodec;
+ private String outputPath;
+ private TableDesc indexTableDesc;
+
+ /*
+ * used for indexing.
+ */
+ private String indexCols;
+
public MapredWork() {
aliasToPartnInfo = new LinkedHashMap();
}
@@ -240,7 +264,7 @@
@SuppressWarnings("nls")
public String isInvalid() {
- if ((getNumReduceTasks() >= 1) && (getReducer() == null)) {
+ if((getNumReduceTasks() >= 1) && (getReducer() == null) && (getIndexCols() == null)) {
return "Reducers > 0 but no reduce operator";
}
@@ -321,5 +345,109 @@
public void setInputformat(String inputformat) {
this.inputformat = inputformat;
}
+
+ public String getIndexCols() {
+ return indexCols;
+ }
+ public void setIndexCols(String indexCols) {
+ this.indexCols = indexCols;
+ }
+
+ public Class extends Mapper> getMapperClass() {
+ return mapperClass;
+ }
+
+ public void setMapperClass(Class extends Mapper> mapperClass) {
+ this.mapperClass = mapperClass;
+ }
+
+ public Class extends Reducer> getReducerClass() {
+ return reducerClass;
+ }
+
+ public void setReducerClass(Class extends Reducer> reducerClass) {
+ this.reducerClass = reducerClass;
+ }
+
+ public Class> getMapOutputKeyClass() {
+ return mapOutputKeyClass;
+ }
+
+ public void setMapOutputKeyClass(Class> mapOutputKeyClass) {
+ this.mapOutputKeyClass = mapOutputKeyClass;
+ }
+
+ public Class> getMapOutputValueClass() {
+ return mapOutputValueClass;
+ }
+
+ public void setMapOutputValueClass(Class> mapOutputValueClass) {
+ this.mapOutputValueClass = mapOutputValueClass;
+ }
+
+ public Class> getOutputKeyClass() {
+ return outputKeyClass;
+ }
+
+ public void setOutputKeyClass(Class> outputKeyClass) {
+ this.outputKeyClass = outputKeyClass;
+ }
+
+ public Class> getOutputValueClass() {
+ return outputValueClass;
+ }
+
+ public void setOutputValueClass(Class> outputValueClass) {
+ this.outputValueClass = outputValueClass;
+ }
+
+ public Class extends InputFormat> getInputFormatCls() {
+ return inputFormatCls;
+ }
+
+ public void setInputFormatCls(Class extends InputFormat> inputFormatCls) {
+ this.inputFormatCls = inputFormatCls;
+ }
+
+ public Class extends OutputFormat> getOutputFormatCls() {
+ return outputFormatCls;
+ }
+
+ public void setOutputFormatCls(Class extends OutputFormat> outputFormat) {
+ this.outputFormatCls = outputFormat;
+ }
+
+ public boolean getCompressed() {
+ return compressed;
+ }
+
+ public void setCompressed(boolean isCompressed) {
+ this.compressed = isCompressed;
+ }
+
+ public String getCompressCodec() {
+ return compressCodec;
+ }
+
+ public void setCompressCodec(String compressCodec) {
+ this.compressCodec = compressCodec;
+ }
+
+ public String getOutputPath() {
+ return outputPath;
+ }
+
+ public void setOutputPath(String outputPath) {
+ this.outputPath = outputPath;
+ }
+
+ public TableDesc getIndexTableDesc() {
+ return indexTableDesc;
+ }
+
+ public void setIndexTableDesc(TableDesc indexTableDesc) {
+ this.indexTableDesc = indexTableDesc;
+ }
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy)
@@ -432,6 +432,7 @@
NonSyncDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
valBuf.reset();
+
valBuf.write(in, vaRowsLen);
if (codec != null) {
decompressedFlag[addIndex] = false;
@@ -1486,6 +1487,10 @@
public String toString() {
return file.toString();
}
+
+ public boolean isCompressedRCFile() {
+ return this.decompress;
+ }
/** Close the reader. */
public void close() {
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy)
@@ -179,8 +179,9 @@
*/
protected static Map> inputFormats;
- static InputFormat getInputFormatFromCache(
- Class inputFormatClass, JobConf job) throws IOException {
+ public static InputFormat getInputFormatFromCache(
+ Class inputFormatClass, JobConf job) throws IOException {
+
if (inputFormats == null) {
inputFormats = new HashMap>();
}
@@ -329,7 +330,7 @@
for (String alias : aliases) {
Operator extends Serializable> op = this.mrwork.getAliasToWork().get(
alias);
- if (op instanceof TableScanOperator) {
+ if (op != null && op instanceof TableScanOperator) {
TableScanOperator tableScan = (TableScanOperator) op;
ArrayList list = tableScan.getNeededColumnIDs();
if (list != null) {
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (working copy)
@@ -206,22 +206,30 @@
public static RecordWriter getHiveRecordWriter(JobConf jc,
TableDesc tableInfo, Class extends Writable> outputClass,
FileSinkDesc conf, Path outPath) throws HiveException {
+ String codecStr = conf.getCompressCodec();
+ String type = conf.getCompressType();
+ boolean compressed = conf.getCompressed();
+ return getHiveRecordWriter(jc, tableInfo, outputClass, compressed,
+ codecStr, type, outPath);
+ }
+
+ public static RecordWriter getHiveRecordWriter(JobConf jc,
+ TableDesc tableInfo, Class extends Writable> outputClass, boolean isCompressed,
+ String codecStr, String compressionType, Path outPath) throws HiveException {
try {
HiveOutputFormat, ?> hiveOutputFormat = tableInfo
.getOutputFileFormatClass().newInstance();
- boolean isCompressed = conf.getCompressed();
+
JobConf jc_output = jc;
if (isCompressed) {
jc_output = new JobConf(jc);
- String codecStr = conf.getCompressCodec();
if (codecStr != null && !codecStr.trim().equals("")) {
Class extends CompressionCodec> codec = (Class extends CompressionCodec>) Class
.forName(codecStr);
FileOutputFormat.setOutputCompressorClass(jc_output, codec);
}
- String type = conf.getCompressType();
- if (type != null && !type.trim().equals("")) {
- CompressionType style = CompressionType.valueOf(type);
+ if (compressionType != null && !compressionType.trim().equals("")) {
+ CompressionType style = CompressionType.valueOf(compressionType);
SequenceFileOutputFormat.setOutputCompressionType(jc, style);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/FilterMapper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/FilterMapper.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/FilterMapper.java (revision 0)
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/*
+ * This is the mapper class for filtering the hive stream output for the given predicates.
+ * Conf Input: hive.index.pred = comma separated list of value to search for
+ * hive.index.pred_pos = position of the predicate column in the input list (0 based)
+ * TODO this should come from execution plan but for now this is input by the client program
+ * Map Input: output from HiveStreaming's select clause
+ * Output: input rows for which output matches
+ */
+public class FilterMapper extends MapReduceBase
+implements Mapper {
+
+ private ArrayList predicateValList = null;
+ private int predPos;
+ private boolean pr = true;
+ public static final Log l4j = LogFactory.getLog("FilterMapper");
+
+
+ @Override
+ public void configure(JobConf job) {
+
+ String predClause = job.get("hive.index.pred");
+ String[] predList = predClause.split(",");
+ if (predList.length < 1) {
+ throw new RuntimeException("Configure: predicate clause should have ip addresses seperated by a comma");
+ }
+
+ predicateValList = new ArrayList (predList.length);
+ for (String pred : predList) {
+ predicateValList.add(pred);
+ }
+ for (String string : predList) {
+ l4j.info(string);
+ }
+ String predPosStr = job.get("hive.index.pred_pos");
+ predPos = Integer.parseInt(predPosStr);
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter) throws IOException {
+ // key and value (ip and user should be non-null
+ if((key == null) || (value == null)) {
+ return;
+ }
+ String [] cols = ((Text)value).toString().split("\t");
+ if (cols.length < predPos) {
+ if (pr) {
+ pr = false;
+ // there are not enough columns so just ignore this row
+ l4j.info("Number of columns: " + cols.length + " Predicate pos: " + predPos);
+ for (String string : cols) {
+ l4j.info(string);
+ }
+ }
+ return;
+ }
+ if(predPos == 0) {
+ String indexedKeyCol = ((Text)key).toString();
+ if(predicateValList.indexOf(indexedKeyCol) == -1) {
+ // current key is not in the predicate val list so nothing do to for this row
+ return;
+ }
+ } else {
+ if(predicateValList.indexOf(cols[predPos-1]) == -1) {
+ return;
+ }
+ }
+
+// String viewTime = ((Text)key).toString();
+// dt.setTime(Long.parseLong(viewTime) * 1000);
+// outKey.set(sdf.format(dt));
+
+ // if it passes the equality predicate then just output it
+// output.collect(outKey, value);
+ output.collect(key, value);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderSummaryMapper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderSummaryMapper.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderSummaryMapper.java (revision 0)
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class IndexBuilderSummaryMapper extends IndexBuilderBaseMapper {
+
+ public void map(Object key, Object value, OutputCollector oc,
+ Reporter reporter) throws IOException {
+ try {
+ reporter.progress();
+ perpareMapper(key, value);
+ oc.collect(cachedIndexCols, cachedPos);
+ } catch (Throwable e) {
+ throw new RuntimeException("Error in indexer's mapper:", e);
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java (revision 0)
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Holds index related constants
+ * Actual logic is spread in HiveStreamingRecordReder and IndexReducer classes.
+ *
+ */
+public class HiveIndex {
+
+ public static final Log l4j = LogFactory.getLog("HiveIndex");
+
+ public static final Class INDEX_MAPRED_BOUN_SERDE = LazySimpleSerDe.class;
+
+ public static enum IndexType {
+ COMPACT_SUMMARY_TABLE("compact"),
+ SUMMARY_TABLE("summary"),
+ PROJECTION("projection");
+
+ private IndexType(String indexType) {
+ indexTypeName = indexType;
+ }
+
+ private String indexTypeName;
+
+ public String getName() {
+ return indexTypeName;
+ }
+ }
+
+ public static IndexType getIndexType(String name) {
+ IndexType[] types = IndexType.values();
+ for (IndexType type : types) {
+ if(type.getName().equals(name.toLowerCase()))
+ return type;
+ }
+ throw new IllegalArgumentException(name + " is not a valid index type.");
+ }
+
+
+ // modeled on sequence file record reader
+ public static class IndexSequenceFileRecordReader
+ implements RecordReader {
+
+ private SequenceFile.Reader in;
+ private long start;
+ private long end;
+ private boolean more = true;
+ protected Configuration conf;
+
+ public IndexSequenceFileRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ this.in = new SequenceFile.Reader(fs, path, conf);
+ this.end = split.getStart() + split.getLength();
+ this.conf = conf;
+
+ if (split.getStart() > in.getPosition())
+ in.sync(split.getStart()); // sync to start
+
+ this.start = in.getPosition();
+ more = start < end;
+ }
+
+ public void sync(long offset) throws IOException {
+ if(offset > end) {
+ offset = end;
+ }
+ in.sync(offset);
+ this.start = in.getPosition();
+ more = start < end;
+ }
+
+ /**
+ * The class of key that must be passed to {@link
+ * #next(WritableComparable,Writable)}..
+ */
+ public Class getKeyClass() {
+ return in.getKeyClass();
+ }
+
+ /**
+ * The class of value that must be passed to {@link
+ * #next(WritableComparable,Writable)}..
+ */
+ public Class getValueClass() {
+ return in.getValueClass();
+ }
+
+ @SuppressWarnings("unchecked")
+ public K createKey() {
+ return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ public V createValue() {
+ return (V) ReflectionUtils.newInstance(getValueClass(), conf);
+ }
+
+ public synchronized boolean next(K key, V value) throws IOException {
+ if (!more)
+ return false;
+ long pos = in.getPosition();
+ boolean eof = in.next(key, value);
+ if (pos >= end && in.syncSeen()) {
+ more = false;
+ } else {
+ more = eof;
+ }
+ return more;
+ }
+
+ public synchronized boolean next(K key) throws IOException {
+ if (!more)
+ return false;
+ long pos = in.getPosition();
+ boolean eof = in.next(key);
+ if (pos >= end && in.syncSeen()) {
+ more = false;
+ } else {
+ more = eof;
+ }
+ return more;
+ }
+
+ public synchronized void getCurrentValue(V value) throws IOException {
+ in.getCurrentValue(value);
+ }
+
+ /**
+ * Return the progress within the input split
+ *
+ * @return 0.0 to 1.0 of the input byte range
+ */
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (in.getPosition() - start)
+ / (float) (end - start));
+ }
+ }
+
+ public synchronized long getPos() throws IOException {
+ return in.getPosition();
+ }
+
+ public synchronized void seek(long pos) throws IOException {
+ in.seek(pos);
+ }
+
+ public synchronized void close() throws IOException {
+ in.close();
+ }
+
+ public boolean syncSeen() {
+ return in.syncSeen();
+ }
+ }
+
+ //IndexBucket
+ public static class IBucket {
+ private String name = null;
+ private SortedSet offsets = new TreeSet();
+ public IBucket(String n) {
+ name = n;
+ }
+ public void add(Long offset) {
+ offsets.add(offset);
+ }
+ public String getName() {
+ return name;
+ }
+ public SortedSet getOffsets() {
+ return offsets;
+ }
+ public boolean equals(Object obj) {
+ if(obj.getClass() != this.getClass()) {
+ return false;
+ }
+ return (((IBucket)obj).name.compareToIgnoreCase(this.name) == 0);
+ }
+ }
+
+ public static class HiveIndexResult {
+ private HashMap>> indexResult = new HashMap>>();
+ JobConf job = null;
+
+ BytesRefWritable[] bytesRef = new BytesRefWritable[2];
+
+ public HiveIndexResult(String indexFile, JobConf conf) throws IOException, HiveException {
+ job = conf;
+
+ bytesRef[0] = new BytesRefWritable();
+ bytesRef[1] = new BytesRefWritable();
+
+ if(indexFile != null) {
+ Path indexFilePath = new Path(indexFile);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus indexStat = fs.getFileStatus(indexFilePath);
+ List paths = new ArrayList();
+ if(indexStat.isDir()) {
+ FileStatus[] fss = fs.listStatus(indexFilePath);
+ for (FileStatus f : fss) {
+ paths.add(f.getPath());
+ }
+ } else {
+ paths.add(indexFilePath);
+ }
+
+ for(Path indexFinalPath : paths) {
+ FSDataInputStream ifile = fs.open(indexFinalPath);
+ LineReader lr = new LineReader(ifile, conf);
+ Text line = new Text();
+ while( lr.readLine(line) > 0) {
+ add(line);
+ }
+ // this will close the input stream
+ lr.close();
+ }
+ }
+ }
+
+
+ Map buckets = new HashMap();
+
+ private void add(Text line) throws HiveException {
+ String l = line.toString();
+ byte[] bytes = l.getBytes();
+ int firstEnd = 0;
+ int i = 0;
+ for (int index = 0; index < bytes.length; index++) {
+ if (bytes[index] == LazySimpleSerDe.DefaultSeparators[0]) {
+ i++;
+ firstEnd = index;
+ }
+ }
+ if (i > 1) {
+ throw new HiveException(
+ "Bad index file row (index file should only contain two columns: bucket_file_name and offset lists.) ."
+ + line.toString());
+ }
+ String bucketFileName = new String(bytes, 0, firstEnd);
+ IBucket bucket = buckets.get(bucketFileName);
+ if(bucket == null) {
+ bucket = new IBucket(bucketFileName);
+ buckets.put(bucketFileName, bucket);
+ }
+
+ int currentStart = firstEnd + 1;
+ int currentEnd = firstEnd + 1;
+ for (; currentEnd < bytes.length; currentEnd++) {
+ if (bytes[currentEnd] == LazySimpleSerDe.DefaultSeparators[1]) {
+ String one_offset = new String (bytes, currentStart, currentEnd - currentStart);
+ Long offset = Long.parseLong(one_offset);
+ bucket.getOffsets().add(offset);
+ currentStart = currentEnd + 1;
+ }
+ }
+ String one_offset = new String(bytes, currentStart, currentEnd
+ - currentStart);
+ bucket.getOffsets().add(Long.parseLong(one_offset));
+ }
+
+
+ public boolean contains(FileSplit split) throws HiveException {
+
+ if(buckets == null) {
+ return false;
+ }
+ String bucketName = split.getPath().toString();
+ IBucket bucket = buckets.get(bucketName);
+ if(bucket == null) {
+ bucketName = split.getPath().toUri().getPath();
+ bucket = buckets.get(bucketName);
+ if(bucket == null) {
+ return false;
+ }
+ }
+
+ for (Long offset : bucket.getOffsets()) {
+ if ( (offset >= split.getStart()) && (offset <= split.getStart() + split.getLength())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderBaseReducer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderBaseReducer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderBaseReducer.java (revision 0)
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public abstract class IndexBuilderBaseReducer extends MapReduceBase implements
+ Reducer {
+
+ Serializer serializer;
+ Deserializer deserializer;
+ Class extends Writable> outputClass;
+ RecordWriter outWriter;
+
+ Path finalPath;
+ FileSystem fs;
+
+ String[] indexColumns;
+ StructObjectInspector rowObjectInspector;
+ StructObjectInspector keyObjectInspector;
+ StandardStructObjectInspector indexRowOutputObjInspector;
+ List structFields = new ArrayList();
+ List indexColsInspectors;
+
+ boolean exception = false;
+ boolean autoDelete = false;
+ Path outPath;
+
+ Object[] indexOutputObjects;
+
+ public void configure(JobConf job) {
+ MapredWork conf = Utilities.getMapRedWork(job);
+ TableDesc table = conf.getIndexTableDesc();
+ try {
+ serializer = (Serializer) table.getDeserializerClass().newInstance();
+ serializer.initialize(job, table.getProperties());
+ outputClass = serializer.getSerializedClass();
+
+ indexColumns = conf.getIndexCols().split(",");
+ indexOutputObjects = new Object[indexColumns.length];
+ indexColsInspectors = new ArrayList();
+ List indexOutputRowInspectors = new ArrayList();
+ List outputColName = new ArrayList();
+ List indexColNames = new ArrayList();
+ deserializer = (Deserializer) HiveIndex.INDEX_MAPRED_BOUN_SERDE
+ .newInstance();
+ deserializer.initialize(job, table.getProperties());
+ rowObjectInspector = (StructObjectInspector) deserializer
+ .getObjectInspector();
+ for (int k = 0; k < indexColumns.length - 2; k++) {
+ String col = indexColumns[k];
+ StructField field = rowObjectInspector.getStructFieldRef(col);
+ structFields.add(field);
+ ObjectInspector inspector = field.getFieldObjectInspector();
+ if (!inspector.getCategory().equals(Category.PRIMITIVE)) {
+ throw new RuntimeException("Only primitive columns can be indexed.");
+ }
+ outputColName.add(col);
+ indexColNames.add(col);
+ indexColsInspectors.add(inspector);
+ indexOutputRowInspectors.add(ObjectInspectorUtils
+ .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.JAVA));
+ }
+
+ for (int k = indexColumns.length - 2; k < indexColumns.length; k++) {
+ String col = indexColumns[k];
+ StructField field = rowObjectInspector.getStructFieldRef(col);
+ structFields.add(field);
+ ObjectInspector inspector = field.getFieldObjectInspector();
+ outputColName.add(col);
+ indexOutputRowInspectors.add(ObjectInspectorUtils
+ .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.JAVA));
+ }
+
+ keyObjectInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(indexColNames,
+ indexColsInspectors);
+ indexRowOutputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(outputColName,
+ indexOutputRowInspectors);
+
+ boolean isCompressed = conf.getCompressed();
+ String codecStr = conf.getCompressCodec();
+ //the job's final output path
+ String specPath = conf.getOutputPath();
+ Path tmpPath = Utilities.toTempPath(specPath);
+ String taskId = Utilities.getTaskId(job);
+ finalPath = new Path(tmpPath, taskId);
+ outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+ fs = (new Path(specPath)).getFileSystem(job);
+ outWriter = HiveFileFormatUtils.getHiveRecordWriter(job, table,
+ outputClass, isCompressed, codecStr, "", outPath);
+ HiveOutputFormat, ?> hiveOutputFormat = table
+ .getOutputFileFormatClass().newInstance();
+
+ finalPath = HiveFileFormatUtils.getOutputFormatFinalPath(tmpPath, taskId,
+ job, hiveOutputFormat, isCompressed, finalPath);
+ autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPath);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter) throws IOException {
+ try {
+ Object row = deserializer.deserialize((Writable) key);
+ for (int i = 0; i < indexColumns.length - 2; i++) {
+ this.indexOutputObjects[i] = ObjectInspectorUtils.copyToStandardObject(
+ rowObjectInspector.getStructFieldData(row, structFields.get(i)),
+ indexColsInspectors.get(i), ObjectInspectorCopyOption.JAVA);
+ }
+ doReduce(this.indexOutputObjects, values, reporter);
+ } catch (Throwable e) {
+ this.exception = true;
+ close();
+ throw new IOException(e);
+ }
+ }
+
+ public abstract void doReduce(Object[] keys, Iterator values,
+ Reporter reporter) throws IOException, SerDeException;
+
+ @Override
+ public void close() throws IOException {
+ // close writer
+ if (outWriter != null) {
+ outWriter.close(exception);
+ outWriter = null;
+ }
+
+ if (!exception) {
+ FileStatus fss = fs.getFileStatus(outPath);
+ System.out.println("renamed path " + outPath + " to " + finalPath
+ + " . File size is " + fss.getLen());
+ if (!fs.rename(outPath, finalPath)) {
+ throw new IOException("Unable to rename output to " + finalPath);
+ }
+ } else {
+ if(!autoDelete) {
+ fs.delete(outPath, true);
+ }
+ }
+ }
+
+ public static void indexBuilderJobClose(String outputPath, boolean success,
+ JobConf job, LogHelper console) throws HiveException, IOException {
+ FileSystem fs = (new Path(outputPath)).getFileSystem(job);
+ Path tmpPath = Utilities.toTempPath(outputPath);
+ Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intemediate");
+ Path finalPath = new Path(outputPath);
+ System.out.println("tmpPath is " + tmpPath);
+ System.out.println("intemediatePath is " + intermediatePath);
+ System.out.println("finalPath is " + finalPath);
+ if (success) {
+ if (fs.exists(tmpPath)) {
+
+ if(fs.exists(intermediatePath)) {
+ fs.delete(intermediatePath, true);
+ }
+ fs.mkdirs(intermediatePath);
+ FileStatus[] fss = fs.listStatus(tmpPath);
+ for (FileStatus f : fss) {
+ fs.rename(f.getPath(), new Path(intermediatePath, f.getPath()
+ .getName()));
+ }
+
+ fss = fs.listStatus(intermediatePath);
+ long len = 0;
+ for (FileStatus f : fss) {
+ len += f.getLen();
+ }
+ console.printInfo("IntermediatePath Path's file number is " + fss.length + ", total size is "
+ + len);
+
+ Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
+
+ fss = fs.listStatus(finalPath);
+ for (FileStatus f : fss) {
+ fs.delete(f.getPath(), true);
+ }
+ Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+ fs.delete(tmpPath, true);
+ fs.delete(intermediatePath, true);
+
+ fss = fs.listStatus(finalPath);
+ len = 0;
+ for (FileStatus f : fss) {
+ len += f.getLen();
+ }
+ console.printInfo("Final Path's file number is " + fss.length + ", total size is "
+ + len);
+ }
+ } else {
+ fs.delete(tmpPath, true);
+ }
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderCompactSumReducer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderCompactSumReducer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderCompactSumReducer.java (revision 0)
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class is reducer for CreateIndex Hive command. Input: <,
+ * +> Output: A Hive table of key, value separated by
+ * ^A^A>^B>^B> so on
+ *
+ */
+public class IndexBuilderCompactSumReducer extends IndexBuilderBaseReducer {
+
+ private Map> bucketOffsetMap = new HashMap>();
+
+ private Writable outVal;
+
+ //private NullWritable key = NullWritable.get();
+
+ public IndexBuilderCompactSumReducer() {
+ }
+
+ public void doReduce(Object[] keys, Iterator values,
+ Reporter reporter) throws IOException, SerDeException {
+ reporter.progress();
+ bucketOffsetMap.clear();
+ while (values.hasNext()) {
+ IndexEntryValueCell value = (IndexEntryValueCell) values.next();
+ String bucketName = value.getBucketName();
+ long offset = value.getPosition();
+ SortedSet bucketPos = bucketOffsetMap.get(bucketName);
+ if (bucketPos == null) {
+ bucketPos = new TreeSet();
+ bucketOffsetMap.put(bucketName, bucketPos);
+ }
+ if (!bucketPos.contains(offset))
+ bucketPos.add(offset);
+ }
+
+ Iterator it = bucketOffsetMap.keySet().iterator();
+ List offsets = new ArrayList();
+ while (it.hasNext()) {
+ String bucketName = it.next();
+ this.indexOutputObjects[this.indexColumns.length-2] = bucketName;
+ SortedSet poses = bucketOffsetMap.get(bucketName);
+ Iterator posIter = poses.iterator();
+ offsets.clear();
+ while (posIter.hasNext()) {
+ offsets.add(posIter.next().toString());
+ }
+ this.indexOutputObjects[this.indexColumns.length-1] = offsets;
+ outVal = this.serializer.serialize(this.indexOutputObjects, indexRowOutputObjInspector);
+ this.outWriter.write(outVal);
+ }
+
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexRecordReader.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexRecordReader.java (revision 0)
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class HiveIndexRecordReader implements IndexRecordReader {
+
+ private RecordReader rawReader;
+
+ private LongWritable cachedKey = new LongWritable();
+
+ private Object rawKey;
+
+ private boolean blockPointer = false;
+ private long blockStart = -1;
+
+ private long currentBlockStart = 0;
+ private long nextBlockStart = -1;
+
+ public HiveIndexRecordReader(RecordReader recordReader) throws IOException {
+ this.rawReader = recordReader;
+ rawKey = recordReader.createKey();
+ this.currentBlockStart = this.rawReader.getPos();
+ }
+
+ public void close() throws IOException {
+ rawReader.close();
+ }
+
+ public LongWritable createKey() {
+ return cachedKey;
+ }
+
+ public Object createValue() {
+ return rawReader.createValue();
+ }
+
+ public long getPos() throws IOException {
+ return rawReader.getPos();
+ }
+
+ public float getProgress() throws IOException {
+ return rawReader.getProgress();
+ }
+
+ /**
+ * the key object from raw record reader is throw away here, and is not passed
+ * to upper.
+ */
+ public boolean next(Object key, Object value) throws IOException {
+ boolean result = rawReader.next(rawKey, value);
+ ((LongWritable) key).set(this.getCurrentValIndexOffset());
+ return result;
+ }
+
+ public void setBlockPointer(boolean b) {
+ blockPointer = b;
+ }
+
+ public void setBlockStart(long blockStart) {
+ this.blockStart = blockStart;
+ }
+
+ @Override
+ public long getCurrentValIndexOffset() throws IOException {
+ if(this.rawReader instanceof IndexRecordReader) {
+ return ((IndexRecordReader)this.rawReader).getCurrentValIndexOffset();
+ }
+
+ long ret = this.currentBlockStart;
+ long pointerPos = this.rawReader.getPos();
+ if(this.nextBlockStart == -1) {
+ this.nextBlockStart = pointerPos;
+ return ret;
+ }
+
+ if (pointerPos != this.nextBlockStart) {
+ // the reader pointer has moved to the end of next block, or the end of
+ // next record. (actually, here's next is current, because we already did
+ // the read action.)
+
+ this.currentBlockStart = this.nextBlockStart;
+ this.nextBlockStart = pointerPos;
+ if(blockPointer) {
+ // we need the beginning of the current block.
+ ret = this.currentBlockStart;
+ }
+ }
+
+ return ret;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderSummaryReducer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderSummaryReducer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderSummaryReducer.java (revision 0)
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Reporter;
+
+public class IndexBuilderSummaryReducer extends IndexBuilderBaseReducer {
+
+ private StringBuilder bl = null;
+ private Text outVal = new Text();
+
+ private NullWritable key = NullWritable.get();
+
+ public IndexBuilderSummaryReducer() {
+ }
+
+ public void doReduce(Object[] keys, Iterator values, Reporter reporter) throws IOException {
+// reporter.progress();
+// while (values.hasNext()) {
+// bl = new StringBuilder();
+// // would toString() change the byte order?
+// String keyPart = Text.decode(((HiveKey) key).getBytes());
+// bl.append(keyPart);
+// bl.append(HiveIndex.KEY_VAL_LIST_SEPARATOR);
+// IndexEntryValueCell value = (IndexEntryValueCell) values.next();
+// String bucketName = value.getBucketName();
+// bl.append(bucketName);
+// bl.append(HiveIndex.BUCKET_POS_VAL_SEPARATOR);
+// bl.append(value.getPosition());
+// outVal.set(bl.toString());
+// this.outWriter.write(outVal);
+// }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexEntryValueCell.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexEntryValueCell.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexEntryValueCell.java (revision 0)
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
+
+public class IndexEntryValueCell implements Writable {
+
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ public void setBucketName(String bucketName) {
+ this.bucketName = bucketName;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+ public void setPosition(long position) {
+ this.position = position;
+ }
+
+ private String bucketName;
+ private long position;
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ bucketName = WritableUtils.readString(in);
+ position = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeString(out, bucketName);
+ out.writeLong(position);
+ }
+
+ static { // register a ctor
+ WritableFactories.setFactory(IndexEntryValueCell.class, new WritableFactory() {
+ public Writable newInstance() {
+ return new IndexEntryValueCell();
+ }
+ });
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderFileFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderFileFormat.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderFileFormat.java (revision 0)
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class IndexBuilderFileFormat
+ extends HiveInputFormat {
+
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+
+ HiveInputSplit hsplit = (HiveInputSplit) split;
+
+ InputSplit inputSplit = hsplit.getInputSplit();
+ String inputFormatClassName = null;
+ Class inputFormatClass = null;
+ try {
+ inputFormatClassName = hsplit.inputFormatClassName();
+ inputFormatClass = Class.forName(inputFormatClassName);
+ } catch (Exception e) {
+ throw new IOException("cannot find class " + inputFormatClassName);
+ }
+
+ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+ boolean blockPointer = false;
+ long blockStart = -1;
+ FileSplit fileSplit = (FileSplit) split;
+ Path path = fileSplit.getPath();
+ FileSystem fs = path.getFileSystem(job);
+ if (inputFormatClass.getName().contains("SequenceFile")) {
+ SequenceFile.Reader in = new SequenceFile.Reader(fs, path, job);
+ blockPointer = in.isBlockCompressed();
+ in.sync(fileSplit.getStart());
+ blockStart = in.getPosition();
+ in.close();
+ } else if (inputFormatClass.getName().contains("RCFile")) {
+ RCFile.Reader in = new RCFile.Reader(fs, path, job);
+ blockPointer = true;
+ in.sync(fileSplit.getStart());
+ blockStart = in.getPosition();
+ in.close();
+ }
+
+ HiveIndexRecordReader indexReader = new HiveIndexRecordReader(inputFormat
+ .getRecordReader(inputSplit, job, reporter));
+
+ if (blockPointer) {
+ indexReader.setBlockPointer(true);
+ indexReader.setBlockStart(blockStart);
+ }
+
+ return indexReader;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/io/HiveIndexInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/io/HiveIndexInputFormat.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/io/HiveIndexInputFormat.java (revision 0)
@@ -0,0 +1,75 @@
+package org.apache.hadoop.hive.ql.index.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.index.HiveIndex.HiveIndexResult;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HiveIndexInputFormat extends HiveInputFormat {
+
+ public static final Log l4j = LogFactory.getLog("HiveIndexInputFormat");
+
+ public HiveIndexInputFormat() {
+ super();
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ String indexFileStr = job.get("hive.exec.index_file");
+ l4j.info("index_file is " + indexFileStr);
+ if (indexFileStr == null) {
+ // ok it is not index accessed plan, what can we do?
+ // return super.getSplits(job, numSplits);
+ throw new IOException("No index file.");
+ }
+ HiveInputSplit[] splits = (HiveInputSplit[]) super
+ .getSplits(job, numSplits);
+
+ HiveIndexResult hiveIndexResult = null;
+ try {
+ hiveIndexResult = new HiveIndexResult(indexFileStr, job);
+ } catch (HiveException e) {
+ // there is
+ l4j.error("Unable to read index so we will go with all the file splits.");
+ e.printStackTrace();
+ }
+
+ ArrayList newSplits = new ArrayList(
+ numSplits);
+ for (HiveInputSplit split : splits) {
+ l4j.info("split start : " + split.getStart());
+ l4j.info("split end : " + (split.getStart() + split.getLength()));
+
+ try {
+ if (hiveIndexResult.contains(split)) {
+ // we may miss a sync here
+ HiveInputSplit newSplit = split;
+ if (split.getStart() > RCFile.SYNC_INTERVAL) {
+ newSplit = new HiveInputSplit(new FileSplit(split.getPath(), split
+ .getStart()
+ - RCFile.SYNC_INTERVAL, split.getLength()
+ + RCFile.SYNC_INTERVAL, split.getLocations()), split
+ .inputFormatClassName());
+ }
+
+ newSplits.add(newSplit);
+ }
+ } catch (HiveException e) {
+ throw new RuntimeException(
+ "Unable to get metadata for input table split" + split.getPath());
+ }
+ }
+ InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()]));
+ l4j.info("Number of input splits: " + splits.length + " new input splits: "
+ + retA.length);
+ return retA;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexRecordReader.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexRecordReader.java (revision 0)
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.RecordReader;
+
+public interface IndexRecordReader extends RecordReader {
+
+ public long getCurrentValIndexOffset() throws IOException;
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderProjectMapper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderProjectMapper.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderProjectMapper.java (revision 0)
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class IndexBuilderProjectMapper extends IndexBuilderBaseMapper {
+
+ public void map(Object key, Object value, OutputCollector oc,
+ Reporter reporter) throws IOException {
+ try {
+ reporter.progress();
+ perpareMapper(key, value);
+ oc.collect(cachedIndexCols, cachedIndexCols);
+ } catch (Throwable e) {
+ throw new RuntimeException("Error in indexer's mapper:", e);
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderBaseMapper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderBaseMapper.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexBuilderBaseMapper.java (revision 0)
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+
+public abstract class IndexBuilderBaseMapper extends MapReduceBase implements Mapper {
+
+ private JobConf jc;
+ private MapredWork conf;
+
+ private Deserializer deserializer;
+ private StructObjectInspector rowObjectInspector;
+
+ public final Log LOG = LogFactory.getLog("IndexBuilderBaseMapper");
+
+ private String bucketName;
+ String[] indexColumns;
+
+ private List structFields = new ArrayList();
+
+ protected HiveKey cachedIndexCols = new HiveKey();
+
+ private Object[] cachedKeys;
+
+ protected IndexEntryValueCell cachedPos;
+
+ transient Serializer keySerializer;
+ transient StructObjectInspector keyObjectInspector;
+ transient boolean keyIsText;
+
+ public IndexBuilderBaseMapper() {
+ }
+
+ public void configure(JobConf job) {
+ try {
+ jc = job;
+
+ conf = Utilities.getMapRedWork(job);
+ bucketName = jc.get("map.input.file");
+ TableDesc table = conf.getKeyDesc();
+
+ Properties p = table.getProperties();
+ Class extends Deserializer> deserializerClass = table
+ .getDeserializerClass();
+ if (deserializerClass == null) {
+ String className = table.getSerdeClassName();
+ if ((className == "") || (className == null)) {
+ throw new HiveException(
+ "SerDe class or the SerDe class name is not set for table: "
+ + table.getProperties().getProperty("name"));
+ }
+ deserializerClass = (Class extends Deserializer>) getClass()
+ .getClassLoader().loadClass(className);
+ }
+
+ deserializer = (Deserializer) deserializerClass.newInstance();
+ deserializer.initialize(jc, p);
+ rowObjectInspector = (StructObjectInspector) deserializer
+ .getObjectInspector();
+ indexColumns = conf.getIndexCols().split(",");
+
+ List fieldObjectInspectors = new ArrayList();
+ List outputColName = new ArrayList();
+ for (int k = 0; k < indexColumns.length -2 ; k++) {
+ String col = indexColumns[k];
+ StructField field = rowObjectInspector.getStructFieldRef(col);
+ structFields.add(field);
+ ObjectInspector inspector = field.getFieldObjectInspector();
+ if (!inspector.getCategory().equals(Category.PRIMITIVE)) {
+ throw new RuntimeException("Only primitive columns can be indexed.");
+ }
+ outputColName.add(col);
+ fieldObjectInspectors.add(inspector);
+ }
+
+ keyObjectInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(outputColName,
+ fieldObjectInspectors);
+
+ TableDesc keyTableDesc = conf.getIndexTableDesc();
+ keySerializer = (Serializer) HiveIndex.INDEX_MAPRED_BOUN_SERDE.newInstance();
+ keySerializer.initialize(jc, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ cachedKeys = new Object[indexColumns.length - 2];
+ cachedPos = (IndexEntryValueCell) WritableFactories.newInstance(IndexEntryValueCell.class);
+ cachedPos.setBucketName(bucketName);
+
+ } catch (Exception e) {
+ LOG.error("Error in building index: " + e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void perpareMapper(Object key, Object value) throws SerDeException {
+ Object row = deserializer.deserialize((Writable) value);
+ for (int i = 0; i < indexColumns.length -2; i++) {
+ cachedKeys[i] = rowObjectInspector.getStructFieldData(row, structFields
+ .get(i));
+ }
+
+ if (keyIsText) {
+ Text k = (Text) keySerializer.serialize(cachedKeys, keyObjectInspector);
+ cachedIndexCols.set(k.getBytes(), 0, k.getLength());
+ } else {
+ // Must be BytesWritable
+ BytesWritable k = (BytesWritable) keySerializer.serialize(cachedKeys,
+ keyObjectInspector);
+ cachedIndexCols.set(k.get(), 0, k.getSize());
+ }
+
+ int keyHashCode = 0;
+ for (int i = 0; i < cachedKeys.length; i++) {
+ Object o = cachedKeys[i];
+ keyHashCode = keyHashCode * 31
+ + ObjectInspectorUtils.hashCode(o, structFields.get(i).getFieldObjectInspector());
+ }
+ cachedIndexCols.setHashCode(keyHashCode);
+
+ cachedPos.setPosition(((LongWritable) key).get());
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy)
@@ -85,6 +85,8 @@
TOK_STRUCT;
TOK_MAP;
TOK_CREATETABLE;
+TOK_CREATEINDEX;
+TOK_UPDATEINDEX;
TOK_LIKETABLE;
TOK_DESCTABLE;
TOK_DESCFUNCTION;
@@ -215,6 +217,8 @@
| createViewStatement
| dropViewStatement
| createFunctionStatement
+ | createIndexStatement
+ | updateIndexStatement
| dropFunctionStatement
;
@@ -255,6 +259,20 @@
)
;
+createIndexStatement
+@init { msgs.push("create index statement");}
+@after {msgs.pop();}
+ : KW_CREATE KW_INDEX indexName=Identifier KW_TYPE typeName=Identifier KW_ON KW_TABLE tab=Identifier LPAREN indexedCols=columnNameList RPAREN tableFileFormat?
+ ->^(TOK_CREATEINDEX $indexName $typeName $tab $indexedCols tableFileFormat?)
+ ;
+
+updateIndexStatement
+@init { msgs.push("update index statement");}
+@after {msgs.pop();}
+ : KW_UPDATE KW_INDEX indexName=Identifier partitionSpec?
+ ->^(TOK_UPDATEINDEX $indexName partitionSpec?)
+ ;
+
dropTableStatement
@init { msgs.push("drop statement"); }
@after { msgs.pop(); }
@@ -1486,6 +1504,8 @@
KW_PARTITIONS : 'PARTITIONS';
KW_TABLE: 'TABLE';
KW_TABLES: 'TABLES';
+KW_INDEX: 'INDEX';
+KW_TYPE: 'TYPE';
KW_FUNCTIONS: 'FUNCTIONS';
KW_SHOW: 'SHOW';
KW_MSCK: 'MSCK';
@@ -1504,6 +1524,7 @@
KW_IS: 'IS';
KW_NULL: 'NULL';
KW_CREATE: 'CREATE';
+KW_UPDATE: 'UPDATE';
KW_EXTERNAL: 'EXTERNAL';
KW_ALTER: 'ALTER';
KW_CHANGE: 'CHANGE';
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (working copy)
@@ -95,6 +95,8 @@
case HiveParser.TOK_SHOW_TABLESTATUS:
case HiveParser.TOK_SHOWFUNCTIONS:
case HiveParser.TOK_SHOWPARTITIONS:
+ case HiveParser.TOK_CREATEINDEX:
+ case HiveParser.TOK_UPDATEINDEX:
case HiveParser.TOK_ALTERTABLE_FILEFORMAT:
case HiveParser.TOK_ALTERTABLE_CLUSTER_SORT:
case HiveParser.TOK_ALTERTABLE_TOUCH:
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 948363)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy)
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -35,9 +36,24 @@
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.index.HiveIndex;
+import org.apache.hadoop.hive.ql.index.IndexEntryValueCell;
+import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -45,16 +61,20 @@
import org.apache.hadoop.hive.ql.plan.DescTableDesc;
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MsckDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ShowFunctionsDesc;
import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc;
import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc;
import org.apache.hadoop.hive.ql.plan.ShowTablesDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TouchDesc;
+import org.apache.hadoop.hive.ql.plan.createIndexDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
/**
@@ -95,6 +115,10 @@
public void analyzeInternal(ASTNode ast) throws SemanticException {
if (ast.getToken().getType() == HiveParser.TOK_DROPTABLE) {
analyzeDropTable(ast, false);
+ } else if (ast.getToken().getType() == HiveParser.TOK_CREATEINDEX) {
+ analyzeCreateIndex(ast);
+ } else if (ast.getToken().getType() == HiveParser.TOK_UPDATEINDEX) {
+ analyzeUpdateIndex(ast);
} else if (ast.getToken().getType() == HiveParser.TOK_DESCTABLE) {
ctx.setResFile(new Path(ctx.getLocalTmpFileURI()));
analyzeDescribeTable(ast);
@@ -156,7 +180,211 @@
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
dropTblDesc), conf));
}
+
+ private HashMap extractPartitionSpecs(Tree partspec)
+ throws SemanticException {
+ HashMap partSpec = new LinkedHashMap();
+ for (int i = 0; i < partspec.getChildCount(); ++i) {
+ CommonTree partspec_val = (CommonTree) partspec.getChild(i);
+ String val = stripQuotes(partspec_val.getChild(1).getText());
+ partSpec.put(partspec_val.getChild(0).getText().toLowerCase(), val);
+ }
+ return partSpec;
+ }
+
+
+ private void analyzeCreateIndex(ASTNode ast) throws SemanticException {
+ String indexName = unescapeIdentifier(ast.getChild(0).getText());
+ String typeName = unescapeIdentifier(ast.getChild(1).getText());
+ String tableName = unescapeIdentifier(ast.getChild(2).getText());
+ List indexedCols = getColumnNames((ASTNode) ast.getChild(3));
+ ASTNode fileFormat = (ASTNode) ast.getChild(4);
+ String serde = null;
+ String inputFormat = null;
+ String outputFormat = null;
+ if (fileFormat != null) {
+ switch (fileFormat.getToken().getType()) {
+ case HiveParser.TOK_TBLSEQUENCEFILE:
+ inputFormat = SEQUENCEFILE_INPUT;
+ outputFormat = SEQUENCEFILE_OUTPUT;
+ break;
+ case HiveParser.TOK_TBLTEXTFILE:
+ inputFormat = TEXTFILE_INPUT;
+ outputFormat = TEXTFILE_OUTPUT;
+ break;
+ case HiveParser.TOK_TABLEFILEFORMAT:
+ inputFormat = unescapeSQLString(fileFormat.getChild(0).getText());
+ outputFormat = unescapeSQLString(fileFormat.getChild(1).getText());
+ break;
+ case HiveParser.TOK_TBLRCFILE:
+ default:
+ inputFormat = RCFILE_INPUT;
+ outputFormat = RCFILE_OUTPUT;
+ serde = COLUMNAR_SERDE;
+ break;
+ }
+ }
+ try {
+ HiveIndex.getIndexType(typeName);
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ createIndexDesc crtIndexDesc = new createIndexDesc(tableName, indexName,
+ indexedCols,inputFormat, outputFormat, serde, typeName);
+ Task> createIndex = TaskFactory.get(new DDLWork(crtIndexDesc), conf);
+ rootTasks.add(createIndex);
+ }
+
+ private void analyzeUpdateIndex(ASTNode ast) throws SemanticException {
+ String indexName = unescapeIdentifier(ast.getChild(0).getText());
+ HashMap partSpec = null;
+ Tree part = ast.getChild(1);
+ if (part != null)
+ partSpec = extractPartitionSpecs(part);
+ List> indexBuilder = getIndexBuilderMapRed(indexName, partSpec);
+ rootTasks.addAll(indexBuilder);
+ }
+
+ private List> getIndexBuilderMapRed(String indexTableName,
+ HashMap partSpec) throws SemanticException {
+ try {
+ Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ indexTableName);
+ String baseTblName = MetaStoreUtils.getBaseTableNameOfIndexTable(tbl
+ .getTTable());
+ Table baseTbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ baseTblName);
+ TableDesc desc = Utilities.getTableDesc(tbl);
+ IndexType indexType = HiveIndex.getIndexType(MetaStoreUtils
+ .getIndexType(tbl.getTTable()));
+ boolean compact = false;
+ boolean projection = false;
+ if (indexType.equals(IndexType.COMPACT_SUMMARY_TABLE)) {
+ compact = true;
+ } else if (indexType.equals(IndexType.PROJECTION)) {
+ projection = true;
+ }
+ List baseTblPartitions = new ArrayList();
+ List indexTblPartitions = new ArrayList();
+ List newBaseTblPartitions = new ArrayList();
+ if (partSpec != null) {
+ // if partspec is specified, then only producing index for that
+ // partition
+ Partition part = db.getPartition(baseTbl, partSpec, false);
+ Partition indexPart = db.getPartition(tbl, partSpec, false);
+ baseTblPartitions.add(part);
+ indexTblPartitions.add(indexPart);
+ } else if (baseTbl.isPartitioned()) {
+ // if no partition is specified, create indexes for all partitions one
+ // by one.
+ baseTblPartitions = db.getPartitions(baseTbl);
+ indexTblPartitions = db.getPartitions(tbl);
+ }
+ List> indexBuilderTasks = new ArrayList>();
+ String indexCols = MetaStoreUtils.getColumnNamesFromFieldSchema(tbl
+ .getCols());
+ if (!baseTbl.isPartitioned()) {
+ // the table does not have any partition, then create index for the
+ // whole table
+ Task> indexBuilder = getIndexBuilderMapRedTask(indexCols, tbl
+ .getDataLocation().toString(), new PartitionDesc(desc, null),
+ new PartitionDesc(Utilities.getTableDesc(baseTbl), null),
+ baseTbl.getDataLocation().toString(), compact, projection, null);
+ indexBuilderTasks.add(indexBuilder);
+ } else {
+
+ // check whether the index table partitions are still exists in base
+ // table
+ for (int i = 0; i < indexTblPartitions.size(); i++) {
+ Partition indexPart = indexTblPartitions.get(i);
+ Partition basePart = null;
+ for (int j = 0; j < baseTblPartitions.size(); j++) {
+ if (baseTblPartitions.get(j).getName().equals(indexPart.getName())) {
+ basePart = baseTblPartitions.get(j);
+ newBaseTblPartitions.add(baseTblPartitions.get(j));
+ break;
+ }
+ }
+ if (basePart == null)
+ throw new RuntimeException(
+ "Partitions of base table and index table are inconsistent.");
+ // for each partition, spawn a map reduce task?
+ Task> indexBuilder = getIndexBuilderMapRedTask(indexCols,
+ indexTblPartitions.get(i).getDataLocation().toString(),
+ new PartitionDesc(desc, indexTblPartitions.get(i).getSpec()),
+ new PartitionDesc(basePart),
+ newBaseTblPartitions.get(i).getDataLocation().toString(),
+ compact, projection, null);
+ indexBuilderTasks.add(indexBuilder);
+ }
+
+ }
+ return indexBuilderTasks;
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ private Task extends Serializable> getIndexBuilderMapRedTask(
+ String indexCols, String destOutputPath, PartitionDesc indexPartDesc,
+ PartitionDesc basePartDesc, String inputPath, boolean compact,
+ boolean projection, Class extends OutputFormat> outputFormat) {
+ MapredWork work = new MapredWork();
+ work.setIndexTableDesc(indexPartDesc.getTableDesc());
+ work.setKeyDesc(basePartDesc.getTableDesc());
+ String aliasString = "building-index";
+ LinkedHashMap> pathToAliases = new LinkedHashMap>();
+ ArrayList alias = new ArrayList();
+ alias.add(aliasString);
+ pathToAliases.put(inputPath, alias);
+ work.setPathToAliases(pathToAliases);
+ LinkedHashMap aliasToPartnInfo = new LinkedHashMap();
+ aliasToPartnInfo.put(aliasString, basePartDesc);
+ work.setAliasToPartnInfo(aliasToPartnInfo);
+ LinkedHashMap> aliasToWork = new LinkedHashMap>();
+ aliasToWork.put(aliasString, new ForwardOperator());
+ work.setAliasToWork(aliasToWork);
+ LinkedHashMap pathToPart = new LinkedHashMap();
+ pathToPart.put(inputPath, basePartDesc);
+ work.setPathToPartitionInfo(pathToPart);
+ work.setIndexCols(indexCols);
+ work.setNumReduceTasks(-1);
+ ExecDriver exec = new ExecDriver();
+ if (!projection) {
+ work
+ .setMapperClass(org.apache.hadoop.hive.ql.index.IndexBuilderSummaryMapper.class);
+ if (compact)
+ work
+ .setReducerClass(org.apache.hadoop.hive.ql.index.IndexBuilderCompactSumReducer.class);
+ else
+ work
+ .setReducerClass(org.apache.hadoop.hive.ql.index.IndexBuilderSummaryReducer.class);
+ } else {
+ work
+ .setMapperClass(org.apache.hadoop.hive.ql.index.IndexBuilderProjectMapper.class);
+ work.setNumReduceTasks(0);
+ }
+
+ work.setMapOutputKeyClass(HiveKey.class);
+ work.setMapOutputValueClass(IndexEntryValueCell.class);
+ work.setCompressed(this.conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT));
+ work.setCompressCodec(this.conf
+ .getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
+ work.setOutputPath(destOutputPath);
+
+ exec.setDelOutputIfExists(true);
+ work
+ .setInputFormatCls(org.apache.hadoop.hive.ql.index.IndexBuilderFileFormat.class);
+ if (outputFormat == null) {
+ outputFormat = RCFileOutputFormat.class;
+ }
+ work.setOutputFormatCls(outputFormat);
+ exec.setWork(work);
+ exec.setId("Stage-0");
+ return exec;
+ }
+
private void analyzeAlterTableProps(ASTNode ast, boolean expectView)
throws SemanticException {