diff --git beeline/src/java/org/apache/hive/beeline/Commands.java beeline/src/java/org/apache/hive/beeline/Commands.java
index def26b6..291adba 100644
--- beeline/src/java/org/apache/hive/beeline/Commands.java
+++ beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -780,6 +780,7 @@ private boolean execute(String line, boolean call) {
logThread.start();
hasResults = stmnt.execute(sql);
logThread.interrupt();
+ logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
}
}
@@ -847,6 +848,7 @@ public void run() {
return;
} catch (InterruptedException e) {
beeLine.debug("Getting log thread is interrupted, since query is done!");
+ showRemainingLogsIfAny(hiveStatement);
return;
}
}
diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e19e43f..cbdbdda 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2493,12 +2493,6 @@ private void initialize(Class> cls) {
// Overlay the values of any system properties whose names appear in the list of ConfVars
applySystemProperties();
- if(this.get("hive.metastore.local", null) != null) {
- l4j.warn("DEPRECATED: Configuration property hive.metastore.local no longer has any " +
- "effect. Make sure to provide a valid value for hive.metastore.uris if you are " +
- "connecting to a remote metastore.");
- }
-
if ((this.get("hive.metastore.ds.retry.attempts") != null) ||
this.get("hive.metastore.ds.retry.interval") != null) {
l4j.warn("DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. " +
diff --git contrib/src/test/results/clientpositive/udaf_example_group_concat.q.out contrib/src/test/results/clientpositive/udaf_example_group_concat.q.out
index 4123c5a..856dcfe 100644
--- contrib/src/test/results/clientpositive/udaf_example_group_concat.q.out
+++ contrib/src/test/results/clientpositive/udaf_example_group_concat.q.out
@@ -26,11 +26,11 @@ STAGE PLANS:
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: substr(value, 5, 1) (type: string), '(' (type: string), key (type: string), ':' (type: string), value (type: string), ')' (type: string)
- outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+ expressions: substr(value, 5, 1) (type: string), key (type: string), value (type: string)
+ outputColumnNames: _col0, _col2, _col4
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Group By Operator
- aggregations: example_group_concat(_col1, _col2, _col3, _col4, _col5)
+ aggregations: example_group_concat('(', _col2, ':', _col4, ')')
keys: _col0 (type: string)
mode: hash
outputColumnNames: _col0, _col1
diff --git contrib/src/test/results/clientpositive/udaf_example_max_n.q.out contrib/src/test/results/clientpositive/udaf_example_max_n.q.out
index 4e911ed..05e0028 100644
--- contrib/src/test/results/clientpositive/udaf_example_max_n.q.out
+++ contrib/src/test/results/clientpositive/udaf_example_max_n.q.out
@@ -26,11 +26,11 @@ STAGE PLANS:
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: substr(value, 5) (type: string), 10 (type: int), if((UDFToDouble(substr(value, 5)) > 250.0), null, substr(value, 5)) (type: string)
- outputColumnNames: _col0, _col1, _col2
+ expressions: substr(value, 5) (type: string), if((UDFToDouble(substr(value, 5)) > 250.0), null, substr(value, 5)) (type: string)
+ outputColumnNames: _col0, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Group By Operator
- aggregations: example_max_n(_col0, _col1), example_max_n(_col2, _col1)
+ aggregations: example_max_n(_col0, 10), example_max_n(_col2, 10)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
diff --git contrib/src/test/results/clientpositive/udaf_example_min_n.q.out contrib/src/test/results/clientpositive/udaf_example_min_n.q.out
index c732838..aba1569 100644
--- contrib/src/test/results/clientpositive/udaf_example_min_n.q.out
+++ contrib/src/test/results/clientpositive/udaf_example_min_n.q.out
@@ -26,11 +26,11 @@ STAGE PLANS:
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: substr(value, 5) (type: string), 10 (type: int), if((UDFToDouble(substr(value, 5)) < 250.0), null, substr(value, 5)) (type: string)
- outputColumnNames: _col0, _col1, _col2
+ expressions: substr(value, 5) (type: string), if((UDFToDouble(substr(value, 5)) < 250.0), null, substr(value, 5)) (type: string)
+ outputColumnNames: _col0, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Group By Operator
- aggregations: example_min_n(_col0, _col1), example_min_n(_col2, _col1)
+ aggregations: example_min_n(_col0, 10), example_min_n(_col2, 10)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
diff --git data/files/datatypes.txt data/files/datatypes.txt
index 0228a27..458c5bd 100644
--- data/files/datatypes.txt
+++ data/files/datatypes.txt
@@ -1,3 +1,3 @@
-\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N
--1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N
+\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N
+-1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N\N
1true1.11121x2ykva92.2111.01abcd1111213142212212x1abcd22012-04-22 09:00:00.123456789123456789.0123456YWJjZA==2013-01-01abc123abc123X'01FF'
diff --git doap_Hive.rdf doap_Hive.rdf
index b773570..e69de29 100644
--- doap_Hive.rdf
+++ doap_Hive.rdf
@@ -1,58 +0,0 @@
-
-
-
-
-
- 2011-11-09
-
- Apache Hive
-
-
- The Apache Hive (TM) data warehouse software facilitates querying and managing large datasets residing in distributed storage.
- The Apache Hive (TM) data warehouse software facilitates querying and managing large datasets residing in distributed storage. Built on top of Apache Hadoop (TM), it provides
-
-* tools to enable easy data extract/transform/load (ETL)
-* a mechanism to impose structure on a variety of data formats
-* access to files stored either directly in Apache HDFS (TM) or in other data storage systems such as Apache HBase (TM)
-* query execution via MapReduce
-
-Hive defines a simple SQL-like query language, called HiveQL, that enables users familiar with SQL to query the data. At the same time, this language also allows programmers who are familiar with the MapReduce framework to be able to plug in their custom mappers and reducers to perform more sophisticated analysis that may not be supported by the built-in capabilities of the language. HiveQL can also be extended with custom scalar functions (UDF's), aggregations (UDAF's), and table functions (UDTF's).
-
-
-
-
- Java
-
-
-
-
-
-
-
-
-
- John Sichi
-
-
-
-
-
diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/CheckColumnAccessHook.java itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/CheckColumnAccessHook.java
index f6058e4..adbb531 100644
--- itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/CheckColumnAccessHook.java
+++ itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/CheckColumnAccessHook.java
@@ -20,7 +20,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -61,9 +61,8 @@ public void run(HookContext hookContext) {
Map> tableToColumnAccessMap =
columnAccessInfo.getTableToColumnAccessMap();
- // We need a new map to ensure output is always produced in the same order.
- // This makes tests that use this hook deterministic.
- Map outputOrderedMap = new HashMap();
+ // Must be deterministic order map for consistent test output across Java versions
+ Map outputOrderedMap = new LinkedHashMap();
for (Map.Entry> tableAccess : tableToColumnAccessMap.entrySet()) {
StringBuilder perTableInfo = new StringBuilder();
diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/CheckTableAccessHook.java itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/CheckTableAccessHook.java
index 8e19fad..7d1fc3d 100644
--- itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/CheckTableAccessHook.java
+++ itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/CheckTableAccessHook.java
@@ -19,7 +19,7 @@
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -61,9 +61,8 @@ public void run(HookContext hookContext) {
Map, Map>> operatorToTableAccessMap =
tableAccessInfo.getOperatorToTableAccessMap();
- // We need a new map to ensure output is always produced in the same order.
- // This makes tests that use this hook deterministic.
- Map outputOrderedMap = new HashMap();
+ // Must be deterministic order map for consistent q-test output across Java versions
+ Map outputOrderedMap = new LinkedHashMap();
for (Map.Entry, Map>> tableAccess:
operatorToTableAccessMap.entrySet()) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d12dfe5..fa40082 100644
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -222,7 +222,7 @@ public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
String tableName = "result";
List lst = null;
try {
- lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer());
+ lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf));
} catch (Exception e) {
LOG.warn("Error getting schema: "
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 6338c3c..0ccab02 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -26,8 +26,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.lang3.StringEscapeUtils;
+import com.google.common.collect.Iterators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -49,14 +51,13 @@
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -70,74 +71,81 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.AnnotationUtils;
/**
* FetchTask implementation.
**/
public class FetchOperator implements Serializable {
- static Log LOG = LogFactory.getLog(FetchOperator.class.getName());
- static LogHelper console = new LogHelper(LOG);
+ static final Log LOG = LogFactory.getLog(FetchOperator.class.getName());
+ static final LogHelper console = new LogHelper(LOG);
public static final String FETCH_OPERATOR_DIRECTORY_LIST =
"hive.complete.dir.list";
- private boolean isNativeTable;
private FetchWork work;
- protected Operator> operator; // operator tree for processing row further (option)
- private int splitNum;
- private PartitionDesc currPart;
- private TableDesc currTbl;
- private boolean tblDataDone;
- private FooterBuffer footerBuffer = null;
- private int headerCount = 0;
- private int footerCount = 0;
-
- private boolean hasVC;
- private boolean isPartitioned;
+ private Operator> operator; // operator tree for processing row further (optional)
+
+ private final boolean hasVC;
+ private final boolean isStatReader;
+ private final boolean isPartitioned;
+ private final boolean isNonNativeTable;
private StructObjectInspector vcsOI;
private List vcCols;
private ExecMapperContext context;
+ private transient Deserializer tableSerDe;
+ private transient StructObjectInspector tableOI;
+ private transient StructObjectInspector partKeyOI;
+ private transient StructObjectInspector convertedOI;
+
+ private transient Iterator iterPath;
+ private transient Iterator iterPartDesc;
+ private transient Iterator iterSplits = Iterators.emptyIterator();
+
+ private transient Path currPath;
+ private transient PartitionDesc currDesc;
+ private transient Deserializer currSerDe;
+ private transient Converter ObjectConverter;
private transient RecordReader currRecReader;
- private transient FetchInputFormatSplit[] inputSplits;
- private transient InputFormat inputFormat;
+
private transient JobConf job;
private transient WritableComparable key;
private transient Writable value;
private transient Object[] vcValues;
- private transient Deserializer serde;
- private transient Deserializer tblSerde;
- private transient Converter partTblObjectInspectorConverter;
- private transient Iterator iterPath;
- private transient Iterator iterPartDesc;
- private transient Path currPath;
- private transient StructObjectInspector objectInspector;
- private transient StructObjectInspector rowObjectInspector;
- private transient ObjectInspector partitionedTableOI;
+ private transient int headerCount;
+ private transient int footerCount;
+ private transient FooterBuffer footerBuffer;
+
+ private transient StructObjectInspector outputOI;
private transient Object[] row;
- public FetchOperator() {
- }
-
- public FetchOperator(FetchWork work, JobConf job) {
- this.job = job;
- this.work = work;
- initialize();
+ public FetchOperator(FetchWork work, JobConf job) throws HiveException {
+ this(work, job, null, null);
}
public FetchOperator(FetchWork work, JobConf job, Operator> operator,
- List vcCols) {
+ List vcCols) throws HiveException {
this.job = job;
this.work = work;
this.operator = operator;
this.vcCols = vcCols;
+ this.hasVC = vcCols != null && !vcCols.isEmpty();
+ this.isStatReader = work.getTblDesc() == null;
+ this.isPartitioned = !isStatReader && work.isPartitioned();
+ this.isNonNativeTable = !isStatReader && work.getTblDesc().isNonNative();
initialize();
}
- private void initialize() {
- if (hasVC = vcCols != null && !vcCols.isEmpty()) {
+ private void initialize() throws HiveException {
+ if (isStatReader) {
+ outputOI = work.getStatRowOI();
+ return;
+ }
+ if (hasVC) {
List names = new ArrayList(vcCols.size());
List inspectors = new ArrayList(vcCols.size());
for (VirtualColumn vc : vcCols) {
@@ -147,8 +155,6 @@ private void initialize() {
vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
vcValues = new Object[vcCols.size()];
}
- isPartitioned = work.isPartitioned();
- tblDataDone = false;
if (hasVC && isPartitioned) {
row = new Object[3];
} else if (hasVC || isPartitioned) {
@@ -156,21 +162,27 @@ private void initialize() {
} else {
row = new Object[1];
}
- if (work.getTblDesc() != null) {
- isNativeTable = !work.getTblDesc().isNonNative();
+ if (isPartitioned) {
+ iterPath = work.getPartDir().iterator();
+ iterPartDesc = work.getPartDesc().iterator();
} else {
- isNativeTable = true;
+ iterPath = Arrays.asList(work.getTblDir()).iterator();
+ iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null));
}
- setupExecContext();
+ outputOI = setupOutputObjectInspector();
+ context = setupExecContext(operator, work.getPathLists());
}
- private void setupExecContext() {
+ private ExecMapperContext setupExecContext(Operator operator, List paths) {
+ ExecMapperContext context = null;
if (hasVC || work.getSplitSample() != null) {
context = new ExecMapperContext(job);
if (operator != null) {
operator.setExecContext(context);
}
}
+ setFetchOperatorContext(job, paths);
+ return context;
}
public FetchWork getWork() {
@@ -181,42 +193,6 @@ public void setWork(FetchWork work) {
this.work = work;
}
- public int getSplitNum() {
- return splitNum;
- }
-
- public void setSplitNum(int splitNum) {
- this.splitNum = splitNum;
- }
-
- public PartitionDesc getCurrPart() {
- return currPart;
- }
-
- public void setCurrPart(PartitionDesc currPart) {
- this.currPart = currPart;
- }
-
- public TableDesc getCurrTbl() {
- return currTbl;
- }
-
- public void setCurrTbl(TableDesc currTbl) {
- this.currTbl = currTbl;
- }
-
- public boolean isTblDataDone() {
- return tblDataDone;
- }
-
- public void setTblDataDone(boolean tblDataDone) {
- this.tblDataDone = tblDataDone;
- }
-
- public boolean isEmptyTable() {
- return work.getTblDir() == null && (work.getPartDir() == null || work.getPartDir().isEmpty());
- }
-
/**
* A cache of InputFormat instances.
*/
@@ -243,146 +219,54 @@ static InputFormat getInputFormatFromCache(Class extends InputFormat> inputFor
return format;
}
- private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception {
- Deserializer serde = table.getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDeWithoutErrorCheck(serde, job, table.getProperties(), null);
- return createRowInspector(getStructOIFrom(serde.getObjectInspector()));
- }
-
- private StructObjectInspector getRowInspectorFromPartition(PartitionDesc partition,
- ObjectInspector partitionOI) throws Exception {
-
- String pcols = partition.getTableDesc().getProperties().getProperty(
+ private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception {
+ String pcols = tableDesc.getProperties().getProperty(
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- String[] partKeys = pcols.trim().split("/");
- String pcolTypes = partition.getTableDesc().getProperties().getProperty(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- String[] partKeyTypes = pcolTypes.trim().split(":");
- row[1] = createPartValue(partKeys, partition.getPartSpec(), partKeyTypes);
+ String pcolTypes = tableDesc.getProperties().getProperty(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- return createRowInspector(getStructOIFrom(partitionOI), partKeys, partKeyTypes);
- }
-
- private StructObjectInspector getRowInspectorFromPartitionedTable(TableDesc table)
- throws Exception {
- Deserializer serde = table.getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDe(serde, job, table.getProperties(), null);
- String pcols = table.getProperties().getProperty(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
String[] partKeys = pcols.trim().split("/");
- String pcolTypes = table.getProperties().getProperty(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- String[] partKeyTypes = pcolTypes.trim().split(":");
- row[1] = null;
- return createRowInspector(getStructOIFrom(serde.getObjectInspector()), partKeys, partKeyTypes);
- }
-
- private StructObjectInspector getStructOIFrom(ObjectInspector current) throws SerDeException {
- if (objectInspector != null) {
- current = DelegatedObjectInspectorFactory.reset(objectInspector, current);
- } else {
- current = DelegatedObjectInspectorFactory.wrap(current);
- }
- return objectInspector = (StructObjectInspector) current;
- }
-
- private StructObjectInspector createRowInspector(StructObjectInspector current)
- throws SerDeException {
- return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector(
- Arrays.asList(current, vcsOI)) : current;
- }
-
- private StructObjectInspector createRowInspector(StructObjectInspector current, String[] partKeys, String[] partKeyTypes)
- throws SerDeException {
- List partNames = new ArrayList();
- List partObjectInspectors = new ArrayList();
+ String[] partKeyTypes = pcolTypes.trim().split(":");
+ ObjectInspector[] inspectors = new ObjectInspector[partKeys.length];
for (int i = 0; i < partKeys.length; i++) {
- String key = partKeys[i];
- partNames.add(key);
- ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+ inspectors[i] = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
- partObjectInspectors.add(oi);
}
- StructObjectInspector partObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(partNames, partObjectInspectors);
-
- return ObjectInspectorFactory.getUnionStructObjectInspector(
- hasVC ? Arrays.asList(current, partObjectInspector, vcsOI) :
- Arrays.asList(current, partObjectInspector));
- }
-
- private Object[] createPartValue(String[] partKeys, Map partSpec, String[] partKeyTypes) {
- Object[] partValues = new Object[partKeys.length];
- for (int i = 0; i < partKeys.length; i++) {
- String key = partKeys[i];
- ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
- TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
- partValues[i] =
- ObjectInspectorConverters.
- getConverter(PrimitiveObjectInspectorFactory.
- javaStringObjectInspector, oi).convert(partSpec.get(key));
+ return ObjectInspectorFactory.getStandardStructObjectInspector(
+ Arrays.asList(partKeys), Arrays.asList(inspectors));
+ }
+
+ private Object[] createPartValue(PartitionDesc partDesc, StructObjectInspector partOI) {
+ Map partSpec = partDesc.getPartSpec();
+ List extends StructField> fields = partOI.getAllStructFieldRefs();
+ Object[] partValues = new Object[fields.size()];
+ for (int i = 0; i < partValues.length; i++) {
+ StructField field = fields.get(i);
+ String value = partSpec.get(field.getFieldName());
+ ObjectInspector oi = field.getFieldObjectInspector();
+ partValues[i] = ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi).convert(value);
}
return partValues;
}
- private void getNextPath() throws Exception {
- // first time
- if (iterPath == null) {
- if (work.isNotPartitioned()) {
- if (!tblDataDone) {
- currPath = work.getTblDir();
- currTbl = work.getTblDesc();
- if (isNativeTable) {
- FileSystem fs = currPath.getFileSystem(job);
- if (fs.exists(currPath)) {
- FileStatus[] fStats = listStatusUnderPath(fs, currPath);
- for (FileStatus fStat : fStats) {
- if (fStat.getLen() > 0) {
- tblDataDone = true;
- break;
- }
- }
- }
- } else {
- tblDataDone = true;
- }
-
- if (!tblDataDone) {
- currPath = null;
- }
- return;
- } else {
- currTbl = null;
- currPath = null;
- }
- return;
- } else {
- setFetchOperatorContext(job, work.getPartDir());
- iterPath = work.getPartDir().iterator();
- iterPartDesc = work.getPartDesc().iterator();
- }
- }
-
+ private boolean getNextPath() throws Exception {
while (iterPath.hasNext()) {
- Path nxt = iterPath.next();
- PartitionDesc prt = null;
- if (iterPartDesc != null) {
- prt = iterPartDesc.next();
+ currPath = iterPath.next();
+ currDesc = iterPartDesc.next();
+ if (isNonNativeTable) {
+ return true;
}
- FileSystem fs = nxt.getFileSystem(job);
- if (fs.exists(nxt)) {
- FileStatus[] fStats = listStatusUnderPath(fs, nxt);
- for (FileStatus fStat : fStats) {
+ FileSystem fs = currPath.getFileSystem(job);
+ if (fs.exists(currPath)) {
+ for (FileStatus fStat : listStatusUnderPath(fs, currPath)) {
if (fStat.getLen() > 0) {
- currPath = nxt;
- if (iterPartDesc != null) {
- currPart = prt;
- }
- return;
+ return true;
}
}
}
}
+ return false;
}
/**
@@ -390,119 +274,53 @@ private void getNextPath() throws Exception {
* This helps InputFormats make decisions based on the scope of the complete
* operation.
* @param conf the configuration to modify
- * @param partDirs the list of partition directories
+ * @param paths the list of input directories
*/
- static void setFetchOperatorContext(JobConf conf,
- ArrayList partDirs) {
- if (partDirs != null) {
+ static void setFetchOperatorContext(JobConf conf, List paths) {
+ if (paths != null) {
StringBuilder buff = new StringBuilder();
- boolean first = true;
- for(Path p: partDirs) {
- if (first) {
- first = false;
- } else {
+ for (Path path : paths) {
+ if (buff.length() > 0) {
buff.append('\t');
}
- buff.append(StringEscapeUtils.escapeJava(p.toString()));
+ buff.append(StringEscapeUtils.escapeJava(path.toString()));
}
conf.set(FETCH_OPERATOR_DIRECTORY_LIST, buff.toString());
}
}
- /**
- * A cache of Object Inspector Settable Properties.
- */
- private static Map oiSettableProperties = new HashMap();
-
private RecordReader getRecordReader() throws Exception {
- if (currPath == null) {
- getNextPath();
- if (currPath == null) {
+ if (!iterSplits.hasNext()) {
+ FetchInputFormatSplit[] splits = getNextSplits();
+ if (splits == null) {
return null;
}
-
- // not using FileInputFormat.setInputPaths() here because it forces a
- // connection
- // to the default file system - which may or may not be online during pure
- // metadata
- // operations
- job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath
- .toString()));
-
- // Fetch operator is not vectorized and as such turn vectorization flag off so that
- // non-vectorized record reader is created below.
- if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
- HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
- }
-
- PartitionDesc partDesc;
- if (currTbl == null) {
- partDesc = currPart;
+ if (!isPartitioned || convertedOI == null) {
+ currSerDe = tableSerDe;
+ ObjectConverter = null;
} else {
- partDesc = new PartitionDesc(currTbl, null);
- }
-
- Class extends InputFormat> formatter = partDesc.getInputFileFormatClass();
- inputFormat = getInputFormatFromCache(formatter, job);
- Utilities.copyTableJobPropertiesToConf(partDesc.getTableDesc(), job);
- InputSplit[] splits = inputFormat.getSplits(job, 1);
- FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];
- for (int i = 0; i < splits.length; i++) {
- inputSplits[i] = new FetchInputFormatSplit(splits[i], formatter.getName());
- }
- if (work.getSplitSample() != null) {
- inputSplits = splitSampling(work.getSplitSample(), inputSplits);
- }
- this.inputSplits = inputSplits;
-
- splitNum = 0;
- serde = partDesc.getDeserializer(job);
- SerDeUtils.initializeSerDe(serde, job, partDesc.getTableDesc().getProperties(),
- partDesc.getProperties());
-
- if (currTbl != null) {
- tblSerde = serde;
+ currSerDe = needConversion(currDesc) ? currDesc.getDeserializer(job) : tableSerDe;
+ ObjectInspector inputOI = currSerDe.getObjectInspector();
+ ObjectConverter = ObjectInspectorConverters.getConverter(inputOI, convertedOI);
}
- else {
- tblSerde = currPart.getTableDesc().getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDe(tblSerde, job, currPart.getTableDesc().getProperties(), null);
+ if (isPartitioned) {
+ row[1] = createPartValue(currDesc, partKeyOI);
}
-
- ObjectInspector outputOI = ObjectInspectorConverters.getConvertedOI(
- serde.getObjectInspector(),
- partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI,
- oiSettableProperties);
-
- partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
- serde.getObjectInspector(), outputOI);
+ iterSplits = Arrays.asList(splits).iterator();
if (LOG.isDebugEnabled()) {
LOG.debug("Creating fetchTask with deserializer typeinfo: "
- + serde.getObjectInspector().getTypeName());
+ + currSerDe.getObjectInspector().getTypeName());
LOG.debug("deserializer properties:\ntable properties: " +
- partDesc.getTableDesc().getProperties() + "\npartition properties: " +
- partDesc.getProperties());
- }
-
- if (currPart != null) {
- getRowInspectorFromPartition(currPart, outputOI);
- }
- }
-
- if (splitNum >= inputSplits.length) {
- if (currRecReader != null) {
- currRecReader.close();
- currRecReader = null;
+ currDesc.getTableDesc().getProperties() + "\npartition properties: " +
+ currDesc.getProperties());
}
- currPath = null;
- return getRecordReader();
}
- final FetchInputFormatSplit target = inputSplits[splitNum];
+ final FetchInputFormatSplit target = iterSplits.next();
@SuppressWarnings("unchecked")
- final RecordReader reader =
- inputFormat.getRecordReader(target.getInputSplit(), job, Reporter.NULL);
+ final RecordReader reader = target.getRecordReader(job);
if (hasVC || work.getSplitSample() != null) {
currRecReader = new HiveRecordReader(reader, job) {
@Override
@@ -517,23 +335,52 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException
}
};
((HiveContextAwareRecordReader)currRecReader).
- initIOContext(target, job, inputFormat.getClass(), reader);
+ initIOContext(target, job, target.inputFormat.getClass(), reader);
} else {
currRecReader = reader;
}
- splitNum++;
key = currRecReader.createKey();
value = currRecReader.createValue();
+ headerCount = footerCount = 0;
return currRecReader;
}
+ protected FetchInputFormatSplit[] getNextSplits() throws Exception {
+ while (getNextPath()) {
+ // not using FileInputFormat.setInputPaths() here because it forces a connection to the
+ // default file system - which may or may not be online during pure metadata operations
+ job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString()));
+
+ // Fetch operator is not vectorized and as such turn vectorization flag off so that
+ // non-vectorized record reader is created below.
+ HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+
+ Class extends InputFormat> formatter = currDesc.getInputFileFormatClass();
+ Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job);
+ InputFormat inputFormat = getInputFormatFromCache(formatter, job);
+
+ InputSplit[] splits = inputFormat.getSplits(job, 1);
+ FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat);
+ }
+ if (work.getSplitSample() != null) {
+ inputSplits = splitSampling(work.getSplitSample(), inputSplits);
+ }
+ if (inputSplits.length > 0) {
+ return inputSplits;
+ }
+ }
+ return null;
+ }
+
private FetchInputFormatSplit[] splitSampling(SplitSample splitSample,
FetchInputFormatSplit[] splits) {
long totalSize = 0;
for (FetchInputFormatSplit split: splits) {
totalSize += split.getLength();
}
- List result = new ArrayList();
+ List result = new ArrayList(splits.length);
long targetSize = splitSample.getTargetSize(totalSize);
int startIndex = splitSample.getSeedNum() % splits.length;
long size = 0;
@@ -557,18 +404,18 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException
* Currently only used by FetchTask.
**/
public boolean pushRow() throws IOException, HiveException {
- if(work.getRowsComputedUsingStats() != null) {
+ if (work.getRowsComputedUsingStats() != null) {
for (List