diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6a22890..6e87567 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -988,7 +988,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Default file format for CREATE TABLE statement applied to managed tables only. External tables will be \n" + "created with format specified by hive.default.fileformat. Leaving this null will result in using hive.default.fileformat \n" + "for all tables."), - HIVEQUERYRESULTFILEFORMAT("hive.query.result.fileformat", "SequenceFile", new StringSet("TextFile", "SequenceFile", "RCfile"), + HIVEQUERYRESULTFILEFORMAT("hive.query.result.fileformat", "SequenceFile", new StringSet("TextFile", "SequenceFile", "RCfile", "Llap"), "Default file format for storing result of the query."), HIVECHECKFILEFORMAT("hive.fileformat.check", true, "Whether to check file format or not when loading data files"), diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java new file mode 100644 index 0000000..b8b69a7 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java @@ -0,0 +1,99 @@ +package org.apache.hive.jdbc; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.*; + +public class TestLlapInputSplit { + + @Test + public void testWritable() throws Exception { + int splitNum = 88; + byte[] planBytes = "0123456789987654321".getBytes(); + byte[] fragmentBytes = "abcdefghijklmnopqrstuvwxyz".getBytes(); + SplitLocationInfo[] locations = { + new SplitLocationInfo("location1", false), + new SplitLocationInfo("location2", false), + }; + ArrayList fields = new ArrayList(); + fields.add(new FieldSchema("col1", "string", "comment1")); + fields.add(new FieldSchema("col2", "int", "comment2")); + HashMap properties = new HashMap(); + properties.put("key1", "val1"); + Schema schema = new Schema( + fields, + properties); + + org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit( + splitNum, + planBytes, + fragmentBytes, + locations, + schema); + ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(byteOutStream); + split1.write(dataOut); + ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); + DataInputStream dataIn = new DataInputStream(byteInStream); + org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit(); + split2.readFields(dataIn); + + // Did we read all the data? + assertEquals(0, byteInStream.available()); + + checkLlapSplits(split1, split2); + + // Try JDBC LlapInputSplits + org.apache.hive.jdbc.LlapInputSplit jdbcSplit1 = + new org.apache.hive.jdbc.LlapInputSplit(split1, "org.apache.hadoop.hive.llap.LlapInputFormat"); + byteOutStream.reset(); + jdbcSplit1.write(dataOut); + byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); + dataIn = new DataInputStream(byteInStream); + org.apache.hive.jdbc.LlapInputSplit jdbcSplit2 = new org.apache.hive.jdbc.LlapInputSplit(); + jdbcSplit2.readFields(dataIn); + + assertEquals(0, byteInStream.available()); + + checkLlapSplits( + (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(), + (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit()); + assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass()); + } + + static void checkLlapSplits( + org.apache.hadoop.hive.llap.LlapInputSplit split1, + org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception { + + assertEquals(split1.getSplitNum(), split2.getSplitNum()); + assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes()); + assertArrayEquals(split1.getFragmentBytes(), split2.getFragmentBytes()); + SplitLocationInfo[] locationInfo1 = split1.getLocationInfo(); + SplitLocationInfo[] locationInfo2 = split2.getLocationInfo(); + for (int idx = 0; idx < locationInfo1.length; ++idx) { + assertEquals(locationInfo1[idx].getLocation(), locationInfo2[idx].getLocation()); + assertEquals(locationInfo1[idx].isInMemory(), locationInfo2[idx].isInMemory()); + assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk()); + } + assertArrayEquals(split1.getLocations(), split2.getLocations()); + assertEquals(split1.getSchema(), split2.getSchema()); + } + +} diff --git jdbc/src/java/org/apache/hive/jdbc/LlapDump.java jdbc/src/java/org/apache/hive/jdbc/LlapDump.java index b0c0253..a807f6c 100644 --- jdbc/src/java/org/apache/hive/jdbc/LlapDump.java +++ jdbc/src/java/org/apache/hive/jdbc/LlapDump.java @@ -94,19 +94,34 @@ public static void main(String[] args) throws Exception { LlapInputFormat format = new LlapInputFormat(url, user, pwd, query); JobConf job = new JobConf(); - InputSplit[] splits = format.getSplits(job, 1); - RecordReader reader = format.getRecordReader(splits[0], job, null); - if (reader instanceof LlapRecordReader) { - Schema schema = ((LlapRecordReader)reader).getSchema(); - System.out.println(""+schema); - } - System.out.println("Results: "); - System.out.println(""); + InputSplit[] splits = format.getSplits(job, 1); - Text value = reader.createValue(); - while (reader.next(NullWritable.get(), value)) { - System.out.println(value); + if (splits.length == 0) { + System.out.println("No splits returned - empty scan"); + System.out.println("Results: "); + } else { + boolean first = true; + + for (InputSplit s: splits) { + RecordReader reader = format.getRecordReader(s, job, null); + + if (reader instanceof LlapRecordReader && first) { + Schema schema = ((LlapRecordReader)reader).getSchema(); + System.out.println(""+schema); + } + + if (first) { + System.out.println("Results: "); + System.out.println(""); + first = false; + } + + Text value = reader.createValue(); + while (reader.next(NullWritable.get(), value)) { + System.out.println(value); + } + } } } @@ -116,7 +131,7 @@ static Options createOptions() { result.addOption(OptionBuilder .withLongOpt("location") .withDescription("HS2 url") - .hasArg() + .hasArg() .create('l')); result.addOption(OptionBuilder diff --git jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java index e662414..5af2175 100644 --- jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java +++ jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java @@ -76,63 +76,11 @@ public LlapInputFormat(String url, String user, String pwd, String query) { public LlapInputFormat() {} - public class LlapInputSplit implements InputSplitWithLocationInfo { - InputSplitWithLocationInfo nativeSplit; - String inputFormatClassName; - - @Override - public long getLength() throws IOException { - return nativeSplit.getLength(); - } - - @Override - public String[] getLocations() throws IOException { - return nativeSplit.getLocations(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(inputFormatClassName); - out.writeUTF(nativeSplit.getClass().toString()); - nativeSplit.write(out); - } - - @Override - public void readFields(DataInput in) throws IOException { - inputFormatClassName = in.readUTF(); - String splitClass = in.readUTF(); - try { - nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance(); - } catch (Exception e) { - throw new IOException(e); - } - nativeSplit.readFields(in); - } - - @Override - public SplitLocationInfo[] getLocationInfo() throws IOException { - return nativeSplit.getLocationInfo(); - } - - public InputSplit getSplit() { - return nativeSplit; - } - - public InputFormat getInputFormat() { - try { - return (InputFormat) Class.forName(inputFormatClassName) - .newInstance(); - } catch(Exception e) { - return null; - } - } - } @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - try { - return ((InputFormat)Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance()).getRecordReader(split, job, reporter); - } catch (Exception e) { throw new IOException(e); } + LlapInputSplit llapSplit = (LlapInputSplit) split; + return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter); } @Override @@ -162,9 +110,9 @@ public InputSplit getSplit() { while (res.next()) { // deserialize split DataInput in = new DataInputStream(res.getBinaryStream(3)); - InputSplit is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); // todo setAccessible on ctor + InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); is.readFields(in); - ins.add(is); + ins.add(new LlapInputSplit(is, res.getString(1))); } res.close(); @@ -172,7 +120,7 @@ public InputSplit getSplit() { } catch (Exception e) { throw new IOException(e); } - return ins.toArray(new InputSplit[ins.size()]); // todo wrap input split with format + return ins.toArray(new InputSplit[ins.size()]); } public void close() { diff --git jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java new file mode 100644 index 0000000..0f4fd4e --- /dev/null +++ jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java @@ -0,0 +1,73 @@ +package org.apache.hive.jdbc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; +import org.apache.hadoop.mapred.SplitLocationInfo; + + +public class LlapInputSplit implements InputSplitWithLocationInfo { + InputSplitWithLocationInfo nativeSplit; + String inputFormatClassName; + + public LlapInputSplit() { + } + + public LlapInputSplit(InputSplitWithLocationInfo nativeSplit, String inputFormatClassName) { + this.nativeSplit = nativeSplit; + this.inputFormatClassName = inputFormatClassName; + } + + @Override + public long getLength() throws IOException { + return nativeSplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return nativeSplit.getLocations(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(inputFormatClassName); + out.writeUTF(nativeSplit.getClass().getName()); + nativeSplit.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + inputFormatClassName = in.readUTF(); + String splitClass = in.readUTF(); + try { + nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance(); + } catch (Exception e) { + throw new IOException(e); + } + nativeSplit.readFields(in); + } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + return nativeSplit.getLocationInfo(); + } + + public InputSplit getSplit() { + return nativeSplit; + } + + public InputFormat getInputFormat() throws IOException { + try { + return (InputFormat) Class.forName(inputFormatClassName) + .newInstance(); + } catch(Exception e) { + throw new IOException(e); + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java index cf13c1e..d8066d5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -72,7 +72,8 @@ public LlapInputFormat() { LlapInputSplit llapSplit = (LlapInputSplit) split; SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); - int llapSubmitPort = HiveConf.getIntVar(job, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); + // TODO HACK: Spark is built with Hive-1.2.1, does not have access to HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT + int llapSubmitPort = job.getInt("hive.llap.daemon.rpc.port", 15001); LOG.info("ZZZ: DBG: Starting LlapTaskUmbilicalExternalClient"); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/LlapStorageHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/LlapStorageHandler.java new file mode 100644 index 0000000..c001cc1 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/LlapStorageHandler.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; + +public class LlapStorageHandler extends DefaultStorageHandler { + @Override + public Class getInputFormatClass() { + throw new RuntimeException("Should not be called."); + } + + @Override + public Class getOutputFormatClass() { + return LlapOutputFormat.class; + } +} diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java index d26a579..2ac0ccd 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -98,10 +98,6 @@ public void write(DataOutput out) throws IOException { byte[] serialzied = serializer.serialize(schema); out.writeInt(serialzied.length); out.write(serialzied); -// AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d); -// TProtocol protocol = new TBinaryProtocol(transport); -// schema.write(protocol); -// binarySchema = transport.getBuf().array(); } catch (Exception e) { throw new IOException(e); } @@ -135,13 +131,6 @@ public void readFields(DataInput in) throws IOException { TDeserializer tDeserializer = new TDeserializer(); schema = new Schema(); tDeserializer.deserialize(schema, schemaBytes); -// AutoExpandingBufferReadTransport transport = new AutoExpandingBufferReadTransport(length, 2d); -// AutoExpandingBuffer buf = transport.getBuf(); -// in.readFully(buf.array(), 0, length); -// -// TProtocol protocol = new TBinaryProtocol(transport); -// schema = new Schema(); -// schema.read(protocol); } catch (Exception e) { throw new IOException(e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 04d26f3..35bca87 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.llap.LlapOutputFormat; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; @@ -270,6 +271,12 @@ public static TableDesc getTableDesc( inputFormat = RCFileInputFormat.class; outputFormat = RCFileOutputFormat.class; assert serdeClass == ColumnarSerDe.class; + } else if ("Llap".equalsIgnoreCase(fileFormat)) { + inputFormat = TextInputFormat.class; + outputFormat = LlapOutputFormat.class; + properties.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE, + "org.apache.hadoop.hive.llap.LlapStorageHandler"); } else { // use TextFile by default inputFormat = TextInputFormat.class; outputFormat = IgnoreKeyTextOutputFormat.class; diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java index 9fa4aa8..f69dea3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java @@ -122,13 +122,13 @@ public ObjectInspector initialize(ObjectInspector[] arguments) try { if (SessionState.get() != null && SessionState.get().getConf() != null) { HiveConf conf = SessionState.get().getConf(); - jc = new JobConf(conf); + jc = DagUtils.getInstance().createConfiguration(conf); db = Hive.get(conf); } else { jc = MapredContext.get().getJobConf(); db = Hive.get(); } - } catch(HiveException e) { + } catch(Exception e) { LOG.error("Failed to initialize: ",e); throw new UDFArgumentException(e); } @@ -189,7 +189,7 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException { try { LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.getName()+"\""); HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none"); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.getName()); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap"); cpr = driver.compileAndRespond(query); if(cpr.getResponseCode() != 0) { @@ -204,8 +204,6 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException { throw new HiveException("Was expecting a single TezTask."); } - Path data = null; - TezWork tezWork = ((TezTask)roots.get(0)).getWork(); if (tezWork.getAllWork().size() != 1) {